postgresql CREATE SCHEMA IF NOT EXISTS引发重复键错误

jum4pzuy  于 2023-03-12  发布在  PostgreSQL
关注(0)|答案(3)|浏览(486)

为了给予一些上下文,命令在任务内部发出,并且许多任务可能同时从多个工作线程发出相同的命令。
每个任务都试图创建一个postgres模式,我经常得到以下错误:

IntegrityError: (IntegrityError) duplicate key value violates unique constraint "pg_namespace_nspname_index"
DETAIL:  Key (nspname)=(9621584361) already exists.
 'CREATE SCHEMA IF NOT EXISTS "9621584361"'

PostgreSQL版本为PostgreSQL 9.4rc1。
是Postgres中的bug吗?

bvk5enib

bvk5enib1#

这是IF NOT EXISTS在表和模式中的一个小缺陷,基本上,它们是一个upsert尝试,PostgreSQL不能干净地处理竞态条件,它是安全的,但是很难看。
如果模式同时在另一个会话中创建,但还没有提交,那么它可能存在,也可能不存在,这取决于您是谁以及您的外观。其他事务不可能“看到”系统编目中的新模式,因为它还没有提交。所以它在pg_namespace中的条目对其他事务是不可见的,所以CREATE SCHEMA/CREATE TABLE尝试创建它,因为就它而言,这个对象不存在。
但是,这将使用唯一约束向表中插入一行。唯一约束必须能够看到未提交的行才能起作用。因此,插入阻塞(停止),直到进行CREATE的第一事务提交或回滚。如果它提交,则第二事务中止,因为它试图插入一个违反唯一约束的行。CREATE SCHEMA不够聪明,无法捕获这种情况并重试。
为了正确修复这个问题,PostgreSQL可能需要 predicate 锁定,它可以锁定 * 一行的潜力 *。这可能会作为当前实现UPSERT工作的一部分添加进来。
对于这些特殊的命令,PostgreSQL可能会对系统目录进行一次“脏读”(dirty read),在那里它可以看到未提交的更改,然后它可以等待未提交的事务提交或回滚,重新执行脏读以查看是否有其他人在等待。并重试。但是这会有一个争用条件,在您进行读取以检查模式的时间和您尝试创建模式的时间之间,其他人可能会创建模式。
因此IF NOT EXISTS变体必须:

  • 检查架构是否存在;如果是,则不执行任何操作而完成。
  • 尝试创建表
  • 如果创建因唯一约束错误而失败,请在开始时重试
  • 如果表创建成功,则完成

据我所知,没有人实现过,或者他们尝试过,但没有被接受。使用这种方法可能会有事务ID刻录率等问题。
我认为这是一个bug,但它是一个“是的,我们知道”的bug,而不是一个“我们会马上修复它”的bug。至少文档应该提到关于IF NOT EXISTS的警告。
我不建议像那样并发地执行DDL。

hc8w905p

hc8w905p2#

我需要在并发创建模式的应用程序中解决这个限制。

LOCK TABLE pg_catalog.pg_namespace

在包括CREATE SCHEMA的事务中。看起来像是一件肮脏和不安全的事情,但帮助我解决了只在测试中发生的问题。

kpbwa7wx

kpbwa7wx3#

这是一个老主题了,但这里有一个比锁定pg_catalog.pg_namespace表更实用的解决方案。
假设像OP一样,您有多个任务同时运行,但共享相同的源代码。要修复上述争用情况,您需要将DDL(例如CREATE SCHEMA)一起分组到一个事务中,然后在事务级别获取一个建议锁。
这可以这样来完成:

SELECT pg_advisory_xact_lock(bigint on 8 bytes at most);

这个锁在被占用时会等待,或者在空闲时直接执行。但是,bigint锁id必须在所有遇到争用条件的任务中完全相同。如果你知道是代码库的哪一部分导致了这个问题,这是一个很好的解决方案。
我建议使用一个明文字符串作为lock_id(比如"lock for schema ddl"),然后将其散列到bigint中,这样以后就不太可能忘记这个任意值的含义。
在Python中,通过SQLAlchemy,这看起来像这样:

import hashlib

def get_transaction_lock(conn: Connection, lock_id: str, timeout_sec: Optional[int] = 300):
    if timeout_sec is not None:
        conn.execute(text(f"SET LOCAL lock_timeout = '{abs(timeout_sec)}s';"))

    # Lock ID has to be a bigint, meaning on 8 bytes
    # hashlib is necessary to obtain a consistent hash, don't use python's hash() func
    num_lock_id = int(hashlib.sha1(lock_id.encode("utf-8")).hexdigest(), 16) % (2 ** 64)
    conn.execute(text(f"SELECT pg_advisory_xact_lock({num_lock_id});"))

相关问题