c++ 如何有效地使用`std::sort`来处理具有运行时记录长度的不透明数据类型?

jtjikinw  于 2023-05-30  发布在  其他
关注(0)|答案(2)|浏览(146)

提问

我想使用std::sort作为运行时记录长度的不透明数据类型。下面是一个answer的类似问题。

尝试次数

我尝试过间接排序记录指针和索引,但这些方法的缓存局部性很差,因为比较需要访问数据集随机分布的行。
我还尝试使用C函数qsort,它允许运行时记录长度。这是更好的,但它需要一个回调函数进行比较。这有两个缺点,1)与使用可以内联的lambda相比,性能受到影响,2)界面人机工程学很差,因为你(可能)必须创建一个结构来保存一个用于比较的函子。

可能的想法

前面提到的答案是A。
pod块伪引用迭代器,带有专门的交换,
但没有给予任何关于如何运作的暗示。

摘要

总而言之,我想要一种方法来调用std::sort,传入一个lambda,用于比较具有运行时记录长度的不透明数据类型数组,如以下代码所示:

#include <iostream>
#include <random>

using std::cout, std::endl;

int main(int argc, const char *argv[]) {
    // We have 100 records each with 50 bytes.
    int nrecords = 100, nbytes = 50;
    std::vector<uint8_t> data(nrecords * nbytes);

    // Generate random data for testing.
    std::uniform_int_distribution<uint8_t> d;
    std::mt19937_64 rng;
    std::generate(data.begin(), data.end(), [&]() { return d(rng); });

    int key_index = 20;
    auto cmp = [&](uint8_t *a, uint8_t *b) {
        int aval = *(int*)(a + key_index), bval = *(int*)(b + key_index);
        return aval < bval;
    };

    // How can I call std::sort with run-time record length?
    // std::sort(data.begin(), ...)
    return 0;
}

迭代器尝试

我试图根据前面的建议编写一个迭代器,但我还不能编译它。我的 iterator-foo 是低于标准的,我觉得我犯了一个概念性的错误。
这里是代码,

#include <algorithm>
#include <random>

struct MemoryIterator {
    using iterator_category = std::forward_iterator_tag;
    using difference_type = std::ptrdiff_t;
    using value_type = uint8_t *;
    using pointer = value_type*;
    using reference = value_type&;

    MemoryIterator(value_type ptr, size_t element_size)
        : ptr_(ptr)
        , element_size_(element_size) {
    }

    reference operator*() {
        return ptr_;
    }

    MemoryIterator& operator++() {
        ptr_ += element_size_;
        return *this;
    }

    MemoryIterator& operator--() {
        ptr_ -= element_size_;
        return *this;
    }

    MemoryIterator operator++(int) {
        auto tmp = *this;
        ++(*this);
        return tmp;
    }

    MemoryIterator operator--(int) {
        auto tmp = *this;
        --(*this);
        return tmp;
    }

    MemoryIterator& operator+=(size_t n) {
        ptr_ += n * element_size_;
        return *this;
    }

    MemoryIterator operator+(size_t n) {
        auto r = *this;
        r += n;
        return r;
    }

    MemoryIterator& operator-=(size_t n) {
        ptr_ -= n * element_size_;
        return *this;
    }

    MemoryIterator operator-(size_t n) {
        auto r = *this;
        r -= n;
        return r;
    }
    friend bool operator==(const MemoryIterator& a, const MemoryIterator& b) {
        return a.ptr_ == b.ptr_;
    }

    friend difference_type operator-(const MemoryIterator& a, const MemoryIterator& b) {
        return a.ptr_ - b.ptr_;
    }

    friend void swap(MemoryIterator a, MemoryIterator b) {
    }

private:
    value_type ptr_;
    size_t element_size_;
};

int main(int argc, const char *argv[]) {
    int nrecords = 100, nbytes = 50;
    std::vector<uint8_t> data(nrecords * nbytes);

    std::uniform_int_distribution<uint8_t> d;
    std::mt19937_64 rng;
    std::generate(data.begin(), data.end(), [&]() { return d(rng); });

    int key_index = 20;
    auto cmp = [&](uint8_t *a, uint8_t *b) {
        int aval = *(int*)(a + key_index), bval = *(int*)(b + key_index);
        return aval < bval;
    };

    MemoryIterator begin(data.data(), nbytes);
    MemoryIterator end(data.data() + data.size(), nbytes); 
    std::sort(begin, end, cmp);

    return 0;
}

它抱怨说它不能比较两个MemoryIterator,这是真的--它需要使用比较函数。

编译错误

/opt/local/libexec/llvm-16/bin/../include/c++/v1/__algorithm/sort.h:533:21: error:
      invalid operands to binary expression ('MemoryIterator' and 'MemoryIterator')
            if (__i >= __j)
                ~~~ ^  ~~~
/opt/local/libexec/llvm-16/bin/../include/c++/v1/__algorithm/sort.h:639:8: note:
      in instantiation of function template specialization
      'std::__introsort<std::_ClassicAlgPolicy, (lambda at
      sort0.cpp:92:16) &, MemoryIterator>' requested here
  std::__introsort<_AlgPolicy, _Compare>(__first, __last, __comp, __depth_limit);
       ^
/opt/local/libexec/llvm-16/bin/../include/c++/v1/__algorithm/sort.h:699:10: note:
      in instantiation of function template specialization 'std::__sort<(lambda at
      sort0.cpp:92:16) &, MemoryIterator>' requested here
    std::__sort<_WrappedComp>(std::__unwrap_iter(__first), std::__unwrap_iter(__last), _...
1sbrub3j

1sbrub3j1#

以下是让std::sort按您所需的方式工作所需的要素:

代理引用

代理引用是模仿C++引用的用户定义类型(尽可能多)。通过它们,字节数组中的元素实际上被移动了std::sort
请注意,对于一个可能比sort做得更多的更健壮的实现,您还应该提供一个ConstRecordReference,类似于标准库容器中的iteratorconst_iterator

struct RecordReference {
    explicit RecordReference(void* data, size_t element_size):
        _data(data), _size(element_size) {}
    
    RecordReference(Record&);
    
    RecordReference(RecordReference const& rhs):
        _data(rhs.data()), _size(rhs.size()) {}
    
    /// Because this represents a reference, an assignment represents a copy of
    /// the referred-to value, not of the reference
    RecordReference& operator=(RecordReference const& rhs) {
        assert(size() == rhs.size());
        std::memcpy(data(), rhs.data(), size());
        return *this;
    }
    
    RecordReference& operator=(Record const& rhs);
    
    /// Also `swap` swaps 'through' the reference
    friend void swap(RecordReference a, RecordReference b) {
        assert(a.size() == b.size());
        size_t size = a.size();
        // Perhaps don't use alloca if you're really serious
        // about standard conformance
        auto* buffer = (void*)alloca(size);
        std::memcpy(buffer, a.data(), size);
        std::memcpy(a.data(), b.data(), size);
        std::memcpy(b.data(), buffer, size);
    }
    
    void* data() const { return _data; }
    
    size_t size() const { return _size; }
    
private:
    void* _data;
    size_t _size;
};

/// After the definition of Record (see below):

RecordReference::RecordReference(Record& rhs):
    _data(rhs.data()), _size(rhs.size()) {}

RecordReference& RecordReference::operator=(Record const& rhs) {
    assert(size() == rhs.size());
    std::memcpy(data(), rhs.data(), size());
    return *this;
}

表示记录的类类型

我们甚至需要这个,这是相当烦人的。我们需要它,因为libstdc++的std::sort实现(可能是任何实现)创建了一些我们值的堆栈分配副本,所以我们需要为我们的迭代器value_type提供一些实际表示记录的合理内容。
我们提供了一个内联缓冲区,以避免堆分配到一个合理的大小。您可以将InlineSize参数调整为预期的记录大小。

struct Record {
    static constexpr size_t InlineSize = 56; 
    
    Record(RecordReference ref): _size(ref.size()) {
        if (is_inline()) {
            std::memcpy(&inline_data, ref.data(), size());
        }
        else {
            heap_data = std::malloc(size());
            std::memcpy(heap_data, ref.data(), size());
        }
    }

    Record(Record&& rhs) noexcept: _size(rhs.size()) {
        if (is_inline()) {
            std::memcpy(&inline_data, &rhs.inline_data, size());
        }
        else {
            heap_data = rhs.heap_data;
            rhs.heap_data = nullptr;
        }
    }
    
    ~Record() {
        if (!is_inline()) {
            std::free(heap_data);
        }
    }
    
    void* data() { return (void*)((Record const*)this)->data(); }
    
    void const* data() const { return is_inline() ? inline_data : heap_data; }
    
    size_t size() const { return _size; }
    
private:
    bool is_inline() const { return _size <= InlineSize; }
    
    size_t _size;
    union {
        char inline_data[InlineSize];
        void* heap_data;
    };
};

然而,似乎至少libc++的std::sort实现一次只创建一个这种类型的对象。这意味着我们可以在堆栈上预分配一个thread_local缓冲区来保存单个对象。通过这种方式,我们可以防止任何记录大小的堆分配,代价是依赖于std::sort的实现细节,并可能在未来的构建中产生 * 运行时 * 错误,所以我不建议这样做。这也使得这些类型对许多其他算法无用。

最后是迭代器类

正如注解中所指出的,它需要定义关系比较运算符(以满足 random access iterator 的要求,或者只是因为sort逻辑上需要它们)。
它通过value_typereference typedef公开我们的RecordRecordReference类型。

struct MemoryIterator {
    using iterator_category = std::random_access_iterator_tag;
    using difference_type = std::ptrdiff_t;
    using value_type = Record;
    using pointer = void*;
    using reference = RecordReference;

    MemoryIterator(pointer ptr, size_t size)
        : ptr_((char*)ptr)
        , size_(size) {
    }

    reference operator*() const {
        return reference(ptr_, size_);
    }

    reference operator[](size_t index) const {
        return *(*this + index);
    }

    MemoryIterator& operator++() {
        ptr_ += size_;
        return *this;
    }

    MemoryIterator& operator--() {
        ptr_ -= size_;
        return *this;
    }

    MemoryIterator operator++(int) {
        auto tmp = *this;
        ++(*this);
        return tmp;
    }

    MemoryIterator operator--(int) {
        auto tmp = *this;
        --(*this);
        return tmp;
    }

    MemoryIterator& operator+=(size_t n) {
        ptr_ += n * size_;
        return *this;
    }

    MemoryIterator operator+(size_t n) const {
        auto r = *this;
        r += n;
        return r;
    }

    MemoryIterator& operator-=(size_t n) {
        ptr_ -= n * size_;
        return *this;
    }

    MemoryIterator operator-(size_t n) const {
        auto r = *this;
        r -= n;
        return r;
    }
    
    friend bool operator==(const MemoryIterator& a, const MemoryIterator& b) {
        assert(a.size_ == b.size_);
        return a.ptr_ == b.ptr_;
    }
    
    friend std::strong_ordering operator<=>(const MemoryIterator& a, const MemoryIterator& b) {
        assert(a.size_ == b.size_);
        return a.ptr_ <=> b.ptr_;
    }

    friend difference_type operator-(const MemoryIterator& a, const MemoryIterator& b) {
        assert(a.size_ == b.size_);
        return (a.ptr_ - b.ptr_) / a.size_;
    }

private:
    char* ptr_;
    size_t size_;
};

Here you can find a live demo of the code.

ars1skjm

ars1skjm2#

下面是一些简单的概念验证代码,似乎可以工作。它的工作原理是构造第二个struct向量,指向原始数据中的“记录”,然后对其进行排序。
std::sort可以通过创建一个合适的复制构造函数和复制赋值操作符来做正确的事情。我不知道这是否是最佳的,代码当然可以更好地分解。
好,我们开始:

#include <iostream>
#include <cstring>
#include <algorithm>

using std::cout, std::endl;

struct sortable_record
{
    sortable_record (int record_size, uint8_t *data, uint8_t *swap_buf) :
        record_size (record_size), data (data), swap_buf (swap_buf) { }

    sortable_record (const sortable_record &other)
    {
        data = other.swap_buf;
        operator= (other);
    }

    sortable_record &operator= (const sortable_record &other)
    {
        record_size = other.record_size;
        swap_buf = other.swap_buf;
        memcpy (data, other.data, record_size);
        return *this;
    }

    int record_size = 0;
    uint8_t *data = nullptr;
    uint8_t *swap_buf = nullptr;
};

int main() {
    int nrecords = 10, nbytes = 50;
    std::vector<uint8_t> data(nrecords * nbytes);

    // Generate (simplified!) data for testing.
    for (int i = 0; i < nrecords; ++i)
    {
        std::fill (&data [i * nbytes], &data [i * nbytes] + nbytes, 'z' - i);
        *(&data [i * nbytes] + nbytes - 1) = 0;
    }

    // Populate vector of sortable_record's
    std::vector<uint8_t> swap_buf (nbytes);
    std::vector<sortable_record> sort_me;
    sort_me.reserve (nrecords);
    for (int i = 0; i < nrecords; ++i)
        sort_me.emplace_back (nbytes, &data [i * nbytes], swap_buf.data ());
    
    int key_index = 20;
    auto cmp = [key_index](const sortable_record &a, const sortable_record &b) {
        uint8_t aval = a.data [key_index];
        uint8_t bval = b.data [key_index];
        return aval < bval;
    };

    std::sort (sort_me.begin(), sort_me.end(), cmp);

    for (int i = 0; i < nrecords; ++i)
        std::cout << (const char *) &data [i * nbytes] << endl;
}

请注意,如果没有sort_me.reserve (nrecords);,就会发生不好的事情,因此构建sort_me的代码应该全部捆绑在一个很好的工厂函数中。
Live demo

相关问题