c++ 原子递增并分配给另一个原子

9q78igpj  于 2023-01-15  发布在  其他
关注(0)|答案(1)|浏览(162)

假设我有一些全局的:

std::atomic_int next_free_block;

以及多个线程,每个线程都可以访问

std::atomic_int child_offset;

我希望以连续的方式将空闲块分配给子偏移量,也就是说,我希望原子地执行以下操作:

if (child_offset != 0) child_offset = next_free_block++;

显然,上面的实现不起作用,因为多个线程可能会进入if语句的主体,然后试图将不同的块分配给child_offset
我还考虑了以下几点:

int expected = child_offset;
do {
    if (expected == 0) break;
    int updated = next_free_block++;
} while (!child_offset.compare_exchange_weak(&expected, updated);

但这也不起作用,因为如果CAS失败,即使没有给child_offset赋值,递增next_free_block的副作用也会存在,这会在空闲块的分配中留下空隙。
我知道我可以在每个child_offset和潜在的DCLP周围使用互斥锁(或某种自旋锁)来实现这一点,但我想知道这是否可以通过原子操作高效地实现。
其使用情形如下:我有一个并行构建的大型树,它是一个数组,包含以下内容:

struct tree_page {
    atomic<uint32_t> allocated;
    uint32_t child_offset[8];
    uint32_t nodes[1015];
};

树是逐层构建的:首先创建深度0处的节点,然后创建深度1处的节点,等等。在前一步骤中为每个非叶节点分派单独的线程。如果页面中没有留下更多的空间,从全局next_free_page分配新页面,其指向struct tree_page阵列中的第一未使用页面,并被分配给child_ptr的元素。然后在节点字中设置位字段,该位字段指示应当使用child_ptr数组的哪个元素来查找节点的子节点。
我尝试编写的代码如下所示:

int expected = allocated.load(relaxed), updated;
do {
    updated = expected + num_children;
    if (updated > NODES_PER_PAGE) {
        expected = -1; break;
    }
} while (!allocated.compare_exchange_weak(&expected, updated));

if (expected != -1) {
    // successfully allocated in the same page
} else {
    for (int i = 0; i < 8; ++i) {
        // this is the operation I would like to be atomic
        if (child_offset[i] == 0) 
            child_offset[i] = next_free_block++;
        
        int offset = try_allocating_at_page(pages[child_offset[i]]);
        if (offset != -1) {
            // successfully allocated at child_offset i
            // ...

            break;
        }
    }
}
6uxekuva

6uxekuva1#

根据我对您描述的理解,您的child_offset数组最初是用0填充的,然后由不同的线程并发地填充一些具体的值。
在这种情况下,你可以先原子地“标记”值,如果你成功了,就分配有效的值。

constexpr int INVALID_VALUE = -1;

    for (int i = 0; i < 8; ++i) {
        int expected = 0;
        // this is the operation I would like to be atomic
        if (child_offset[i].compare_exchange_weak(expected, INVALID_VALUE)) {
            child_offset[i] = next_free_block++;
        }
        // Not sure if this is needed in your environment, but just in case
        if (child_offset[i] == INVALID_VALUE) continue;
        ...
    }

这并不能保证child_offset数组中的所有值都是升序排列的,但是如果你需要的话,为什么不在不涉及多线程的情况下填充呢?

相关问题