Django batching/bulk update_or_create?

mfpqipee  于 2023-08-08  发布在  Go
关注(0)|答案(6)|浏览(202)

我的数据库中有数据需要定期更新。数据源返回在该时间点可用的所有内容,因此将包括数据库中尚未存在的新数据。
在循环源数据时,如果可能的话,我不希望进行1000次单独的写入。
有没有像update_or_create这样的东西可以批量工作?
一个想法是将update_or_create与手动事务结合使用,但我不确定这是否只是将单个写入排队,或者是否将所有写入组合到一个SQL插入中?
或者类似地,在循环中使用update_or_create的函数上使用@commit_on_success()可以工作吗?
除了翻译数据并将其保存到模型中之外,我没有对数据做任何事情。在循环期间,没有任何东西依赖于该模型。

wfsdck30

wfsdck301#

从Django 4.1开始,**bulk_create方法通过update_conflicts支持 upserts,这是与update_or_create**相同的单查询批量:

class Foo(models.Model):
    a = models.IntegerField(unique=True)
    b = models.IntegerField()

queryset = [Foo(1, 1), Foo(1, 2)]

Foo.objects.bulk_create(
    queryset, 
    update_conflicts=True,
    unique_fields=['a'],
    update_fields=['b'],
)

字符串

c3frrgcw

c3frrgcw2#

自从Django增加了对bulk_update的支持,现在这在某种程度上是可能的,尽管每个批处理需要执行3个数据库调用(一个get,一个bulk create和一个bulk update)。在这里为通用函数创建一个良好的接口有点挑战,因为您希望函数既支持高效查询又支持更新。这里是我实现的一个方法,它是为批量update_or_create设计的,其中您有许多公共标识键(可以是空的)和一个标识键在批次中不同。
这是作为基础模型上的方法实现的,但可以独立于基础模型使用。这也假设基础模型在名为updated_on的模型上有一个auto_now时间戳;如果不是这种情况,则代码中假设这一点的行已经被注解以便于修改。
为了批量使用它,请在调用它之前将更新分成批量。这也是一种绕过数据的方法,这些数据可能具有次要标识符的少量值中的一个,而不必更改接口。

class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: {field_name: field_value}
        unique_key_name: field_name
        unique_key_to_defaults: {field_value: {field_name: field_value}}
        
        ex. Event.bulk_update_or_create(
            {"organization": organization}, "external_id", {1234: {"started": True}}
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
            existing_objs = {
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            }
            
            create_data = {
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            }
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = {"updated_on"}
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = {"updated_on"}
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)

字符串
示例用法:

class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField(unique=True)
    started = models.BooleanField()

organization = Organization.objects.get(...)
updates_by_external_id = {
    1234: {"started": True},
    2345: {"started": True},
    3456: {"started": False},
}
Event.bulk_update_or_create(
    {"organization": organization}, "external_id", updates_by_external_id
)

可能的竞赛条件

上面的代码利用事务和select-for-update来防止更新上的竞争条件。但是,如果两个线程或进程试图创建具有相同标识符的对象,则插入可能存在争用条件。
简单的缓解方法是确保common_keys和unique_key的组合是数据库强制的唯一性约束(这是该函数的预期用途)。这可以通过unique_key引用具有unique=True的字段来实现,或者通过unique_key与common_keys的子集结合来实现,这些子集通过UniqueConstraint一起强制作为唯一的)。使用数据库强制的唯一性保护,如果多个线程试图执行冲突创建,则除一个线程外,所有线程都将失败,并产生IntegrityError。由于封闭事务,失败的线程将不会执行任何更改,并且可以安全地重试或忽略(失败的冲突创建可以被视为首先发生的创建,然后立即被覆盖)。
如果不可能利用唯一性约束,那么您需要实现自己的并发控制或lock the entire table

0yycz8jy

0yycz8jy3#

批量更新将是一个upsert命令,就像@imposeren说的,Postgres 9.5给了你这种能力。我认为Mysql 5.7也可以(参见http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html),这取决于您的确切需求。也就是说,使用db游标可能是最简单的。这没什么不对,它是在那里当ORM只是不够。
沿着这些路线应该行得通。这是伪代码,所以不要只是剪切-n-粘贴它,但概念是有你。

class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)

字符串
这里的假设是:

  • db_results是某种结果迭代器,可以在列表或字典中使用
  • db_results的结果可以直接输入原始sql exec语句
  • 如果任何批处理更新失败,您将回滚所有批处理更新。如果您想为每个块移动到,只需将with块向下推一点
busg9geu

busg9geu5#

我一直在使用@Zags的答案,我认为这是最好的解决方案。但我想给他一些建议。

update_fields = {"updated_on"}
        updates = []
        for key, obj in existing_objs.items():
            obj.update(unique_key_to_defaults[key], save=False)
            update_fields.update(unique_key_to_defaults[key].keys())
            updates.append(obj)
        if existing_objs:
            cls.objects.bulk_update(updates, update_fields)

字符串
如果你使用auto_now=True字段,如果你使用.update()或bulk_update(),它们将不会被更新,这是因为字段“auto_now”使用.保存()触发,正如你可以在文档中阅读的那样。
如果你有auto_now字段F.e:updated_on,最好将其显式添加到unique_key_to_defaults中。

"unique_value" : {
        "field1.." : value...,
        "updated_on" : timezone.now()
    }...

gt0wga4j

gt0wga4j6#

我是这样做的:

# INPUTS

product_names = ['apple','orange','mango', 'grapes', 'pear']
field_name = 'rating'
field_vale = 4

# BULK UPDATE

existing_products = Product.objects.filter(name__in=product_names)
existing_product_names = []

for product in existing_products:
   setattr(product, field_name, field_vale)
   existing_product_names.append(product.name)

Product.objects.bulk_update(existing_products, [field_name])

# BULK CREATE

new_product_names = set(product_names).difference(existing_product_names)

Product.objects.bulk_create([
   Product(name=new_product_name, rating=field_vale)
] for new_product_name in new_product_names)

字符串
总共3个查询将总是发生,而不是n个查询(如果你是在循环中更新和创建)。
注意:我认为产品的名称字段是唯一的上面的例子。

相关问题