C++中的并行循环

kxeu7u2r  于 2022-12-27  发布在  其他
关注(0)|答案(8)|浏览(301)

我想知道在C中是否有一种轻松、直接的方法可以并行计算for和基于范围的-for循环。你会如何实现这样的事情?从Scala中我知道mapfilterforeach函数,也许也可以并行执行这些函数?在C中有简单的方法可以实现吗?
我的主要平台是Linux,但如果它能跨平台工作就更好了。

ssgvzors

ssgvzors1#

使用C++17中的并行算法,我们现在可以用途:

std::vector<std::string> foo;
std::for_each(
    std::execution::par,
    foo.begin(),
    foo.end(),
    [](auto&& item)
    {
        //do stuff with item
    });

以并行计算循环。第一个参数指定execution policy

txu3uszq

txu3uszq2#

你的平台是什么?你可以看看OpenMP,虽然它不是C的一部分。但是它得到了编译器的广泛支持。
至于基于范围的for循环,参见例如Using OpenMP with C++11 range-based for loops?
我也在http://www.open-std.org上看到过一些文档,表明了将并行结构/算法合并到未来C
中的一些努力,但不知道它们的当前状态。

    • 更新**

添加一些示例代码:

template <typename RAIter>
void loop_in_parallel(RAIter first, RAIter last) {
   const size_t n = std::distance(first, last);

   #pragma omp parallel for
   for (size_t i = 0; i < n; i++) {
       auto& elem = *(first + i);
       // do whatever you want with elem
    }
}

线程数可以在运行时通过OMP_NUM_THREADS环境变量设置。

hof1towb

hof1towb3#

使用C++11,你可以用几行代码并行化一个for循环。
我的函数parallel_for()(在后面的文章中定义)将一个for循环分割成更小的块(子循环),并将每个块分配给一个线程。

/// Say you want to parallelize this:
for(int i = 0; i < nb_elements; ++i)
    computation(i);    

/// Then you would do:
parallel_for(nb_elements, [&](int start, int end){ 
    for(int i = start; i < end; ++i)
        computation(i); 
});

我的parallel_for()也可以在类中工作:

struct My_obj {

    /// Replacing:
    void sequential_for(){
        for(int i = 0; i < nb_elements; ++i)
            computation(i);
    }

    /// By:
    void process_chunk(int start, int end)
    {
        for(int i = start; i < end; ++i)
            computation(i);
    }

    void threaded_for(){
        parallel_for(nb_elements, [this](int s, int e){ 
            this->process_chunk(s, e); 
        } );
    }

    
};

最后这里是parallel_for()的实现,只需粘贴一个头文件,随意使用即可:

#include <algorithm>
#include <thread>
#include <functional>
#include <vector>

/// @param[in] nb_elements : size of your for loop
/// @param[in] functor(start, end) :
/// your function processing a sub chunk of the for loop.
/// "start" is the first index to process (included) until the index "end"
/// (excluded)
/// @code
///     for(int i = start; i < end; ++i)
///         computation(i);
/// @endcode
/// @param use_threads : enable / disable threads.
///
///
static
void parallel_for(unsigned nb_elements,
                  std::function<void (int start, int end)> functor,
                  bool use_threads = true)
{
    // -------
    unsigned nb_threads_hint = std::thread::hardware_concurrency();
    unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);

    unsigned batch_size = nb_elements / nb_threads;
    unsigned batch_remainder = nb_elements % nb_threads;

    std::vector< std::thread > my_threads(nb_threads);

    if( use_threads )
    {
        // Multithread execution
        for(unsigned i = 0; i < nb_threads; ++i)
        {
            int start = i * batch_size;
            my_threads[i] = std::thread(functor, start, start+batch_size);
        }
    }
    else
    {
        // Single thread execution (for easy debugging)
        for(unsigned i = 0; i < nb_threads; ++i){
            int start = i * batch_size;
            functor( start, start+batch_size );
        }
    }

    // Deform the elements left
    int start = nb_threads * batch_size;
    functor( start, start+batch_remainder);

    // Wait for the other thread to finish their task
    if( use_threads )
        std::for_each(my_threads.begin(), my_threads.end(), std::mem_fn(&std::thread::join));
}

最后,您可以定义宏,以获得更紧凑的表达式:

#define PARALLEL_FOR_BEGIN(nb_elements) parallel_for(nb_elements, [&](int start, int end){ for(int i = start; i < end; ++i)
#define PARALLEL_FOR_END()})

现在转换序列:

for(int i = 0; i < nb_elements; ++i)
    computation(i);

只不过是一个做的问题:

PARALLEL_FOR_BEGIN(nb_edges)
{
    computation(i);
}PARALLEL_FOR_END();
wwtsj6pe

wwtsj6pe4#

如果您愿意让C++运行时控制并行性,std::async可能非常适合这里。
示例来自cppreference.com:

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>

template <typename RAIter>
int parallel_sum(RAIter beg, RAIter end)
{
    auto len = end - beg;
    if(len < 1000)
        return std::accumulate(beg, end, 0);

    RAIter mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                              parallel_sum<RAIter>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}

int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
}
9ceoxa92

9ceoxa925#

这可以使用threads库函数来完成,具体来说就是pthreads库函数,它可以用来并发地执行操作。
您可以在这里阅读更多关于他们的信息:http://www.tutorialspoint.com/cplusplus/cpp_multithreading.htm
也可以使用std::thread:http://www.cplusplus.com/reference/thread/thread/
下面的代码使用每个线程的线程ID将数组分成两半:

#include <iostream>
#include <cstdlib>
#include <pthread.h>

using namespace std;

#define NUM_THREADS 2

int arr[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

void *splitLoop(void *threadid)
{
   long tid;
   tid = (long)threadid;
   //cout << "Hello World! Thread ID, " << tid << endl;
   int start = (tid * 5);
   int end = start + 5;
   for(int i = start;i < end;i++){
      cout << arr[i] << " ";
   }
   cout << endl;
   pthread_exit(NULL);
}

int main ()
{
   pthread_t threads[NUM_THREADS];
   int rc;
   int i;
   for( i=0; i < NUM_THREADS; i++ ){
      cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, 
                          splitLoop, (void *)i);
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

还要记住编译时必须使用-lpthread标志。
Ideone上的解决方案链接:http://ideone.com/KcsW4P

w3nuxt5m

w3nuxt5m6#

因为这个线程几乎是我每次寻找并行化方法时的答案,所以我决定在arkan的方法基础上再加一点(见他的答案)。
下面的两个方法几乎是相同的,并且语法简单。只需在项目中包含头文件,然后调用其中一个并行版本:
示例:

#include "par_for.h"

int main() {
//replace - 
for(unsigned i = 0; i < 10; ++i){
    std::cout << i << std::endl;
}

//with -
//method 1:
pl::thread_par_for(0, 10, [&](unsigned i){
            std::cout << i << std::endl;   //do something here with the index i
        });   //changing the end to },false); will make the loop sequential

//or method 2:
pl::async_par_for(0, 10, [&](unsigned i){
            std::cout << i << std::endl;   //do something here with the index i
        });   //changing the end to },false); will make the loop sequential

return 0;
}

头文件-par_for. h:

#include <thread>
#include <vector>
#include <functional>
#include <future>

namespace pl{

    void thread_par_for(unsigned start, unsigned end, std::function<void(unsigned i)> fn, bool par = true){

        //internal loop
        auto int_fn = [&fn](unsigned int_start, unsigned seg_size){
            for (unsigned j = int_start; j < int_start+seg_size; j++){
                fn(j);
            }
        };

        //sequenced for
        if(!par){
            return int_fn(start, end);
        }

        //get number of threads
        unsigned nb_threads_hint = std::thread::hardware_concurrency();
        unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);

        //calculate segments
        unsigned total_length = end - start;
        unsigned seg = total_length/nb_threads;
        unsigned last_seg = seg + total_length%nb_threads;

        //launch threads - parallel for
        auto threads_vec = std::vector<std::thread>();
        threads_vec.reserve(nb_threads);
        for(int k = 0; k < nb_threads-1; ++k){
            unsigned current_start = seg*k;
            threads_vec.emplace_back(std::thread(int_fn, current_start, seg));
        }
        {
            unsigned current_start = seg*(nb_threads-1);
            threads_vec.emplace_back(std::thread(int_fn, current_start, last_seg));
        }
        for (auto& th : threads_vec){
            th.join();
        }
    }



    void async_par_for(unsigned start, unsigned end, std::function<void(unsigned i)> fn, bool par = true){

        //internal loop
        auto int_fn = [&fn](unsigned int_start, unsigned seg_size){
            for (unsigned j = int_start; j < int_start+seg_size; j++){
                fn(j);
            }
        };

        //sequenced for
        if(!par){
            return int_fn(start, end);
        }

        //get number of threads
        unsigned nb_threads_hint = std::thread::hardware_concurrency();
        unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);

        //calculate segments
        unsigned total_length = end - start;
        unsigned seg = total_length/nb_threads;
        unsigned last_seg = seg + total_length%nb_threads;

        //launch threads - parallel for
        auto fut_vec = std::vector<std::future<void>>();
        fut_vec.reserve(nb_threads);
        for(int k = 0; k < nb_threads-1; ++k){
            unsigned current_start = seg*k;
            fut_vec.emplace_back(async(int_fn, current_start, seg));
        }
        {
            unsigned current_start = seg*(nb_threads-1);
            fut_vec.emplace_back(std::async(std::launch::async, int_fn, current_start, last_seg));
        }
        for (auto& th : fut_vec){
            th.get();
        }
    }
}

一些简单的测试表明,使用async的方法速度更快,可能是因为标准库控制是否实际启动新线程。

kcrjzv8t

kcrjzv8t7#

Concurrency::parallel_for(PPL)也是实现任务并行的一个不错的选项。
取自C++ Coding Exercise – Parallel For – Monte Carlo PI Calculation

int main() {
    srand(time(NULL)); // seed
    const int N1 = 1000;
    const int N2 = 100000;
    int n = 0;
    int c = 0;
    Concurrency::critical_section cs;
    // it is better that N2 >> N1 for better performance
    Concurrency::parallel_for(0, N1, [&](int i) {
        int t = monte_carlo_count_pi(N2);
        cs.lock(); // race condition
        n += N2;   // total sampling points
        c += t;    // points fall in the circle
        cs.unlock();
    });
    cout < < "pi ~= " << setprecision(9) << (double)c / n * 4.0 << endl;
    return 0;
}
vmdwslir

vmdwslir8#

从C17开始,std::for_each有允许parallel execution的重载。然而,在我的例子中,我的算法需要特定数量的线程来优化执行,而VS 2022中的std::for_each实现使用基于std::thread::hardware_concurrency的线程数量。
对于那些希望能够控制并行工作者数量的人来说,this simple implementation的行为应该与std::for_each类似,而不需要C
17:

template <class Iter, class Func>
void parallel_for_each(unsigned threadCount, Iter first, Iter last, Func func)
{
    Iter it = first;
    if (it == last)
        return;
    if (++it == last)
    {
        func(*first);
        return;
    }

    if (threadCount == 0)
        threadCount = std::max(2u, std::thread::hardware_concurrency());

    std::mutex mx;
    std::vector<std::thread> threads;
    threads.reserve(threadCount - 1);

    auto func2 = [&]() {
        for (;;)
        {
            Iter it;
            {
                std::lock_guard<std::mutex> lock(mx);
                it = first;
                if (it == last)
                    break;
                ++first;
            }
            func(*it);
        }
    };
    for (unsigned i = 0; i < threadCount - 1; ++i, ++it)
    {
        if (it == last)
            break;
        threads.emplace_back(std::thread(func2));
    }
    func2();
    for (auto& th : threads)
        th.join();
}

template <class Iter, class Func>
void parallel_for_each(Iter first, Iter last, Func func)
{
    parallel_for_each(std::thread::hardware_concurrency(), first, last, func);
}

相关问题