/// Say you want to parallelize this:
for(int i = 0; i < nb_elements; ++i)
computation(i);
/// Then you would do:
parallel_for(nb_elements, [&](int start, int end){
for(int i = start; i < end; ++i)
computation(i);
});
我的parallel_for()也可以在类中工作:
struct My_obj {
/// Replacing:
void sequential_for(){
for(int i = 0; i < nb_elements; ++i)
computation(i);
}
/// By:
void process_chunk(int start, int end)
{
for(int i = start; i < end; ++i)
computation(i);
}
void threaded_for(){
parallel_for(nb_elements, [this](int s, int e){
this->process_chunk(s, e);
} );
}
};
最后这里是parallel_for()的实现,只需粘贴一个头文件,随意使用即可:
#include <algorithm>
#include <thread>
#include <functional>
#include <vector>
/// @param[in] nb_elements : size of your for loop
/// @param[in] functor(start, end) :
/// your function processing a sub chunk of the for loop.
/// "start" is the first index to process (included) until the index "end"
/// (excluded)
/// @code
/// for(int i = start; i < end; ++i)
/// computation(i);
/// @endcode
/// @param use_threads : enable / disable threads.
///
///
static
void parallel_for(unsigned nb_elements,
std::function<void (int start, int end)> functor,
bool use_threads = true)
{
// -------
unsigned nb_threads_hint = std::thread::hardware_concurrency();
unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
unsigned batch_size = nb_elements / nb_threads;
unsigned batch_remainder = nb_elements % nb_threads;
std::vector< std::thread > my_threads(nb_threads);
if( use_threads )
{
// Multithread execution
for(unsigned i = 0; i < nb_threads; ++i)
{
int start = i * batch_size;
my_threads[i] = std::thread(functor, start, start+batch_size);
}
}
else
{
// Single thread execution (for easy debugging)
for(unsigned i = 0; i < nb_threads; ++i){
int start = i * batch_size;
functor( start, start+batch_size );
}
}
// Deform the elements left
int start = nb_threads * batch_size;
functor( start, start+batch_remainder);
// Wait for the other thread to finish their task
if( use_threads )
std::for_each(my_threads.begin(), my_threads.end(), std::mem_fn(&std::thread::join));
}
最后,您可以定义宏,以获得更紧凑的表达式:
#define PARALLEL_FOR_BEGIN(nb_elements) parallel_for(nb_elements, [&](int start, int end){ for(int i = start; i < end; ++i)
#define PARALLEL_FOR_END()})
现在转换序列:
for(int i = 0; i < nb_elements; ++i)
computation(i);
#include "par_for.h"
int main() {
//replace -
for(unsigned i = 0; i < 10; ++i){
std::cout << i << std::endl;
}
//with -
//method 1:
pl::thread_par_for(0, 10, [&](unsigned i){
std::cout << i << std::endl; //do something here with the index i
}); //changing the end to },false); will make the loop sequential
//or method 2:
pl::async_par_for(0, 10, [&](unsigned i){
std::cout << i << std::endl; //do something here with the index i
}); //changing the end to },false); will make the loop sequential
return 0;
}
头文件-par_for. h:
#include <thread>
#include <vector>
#include <functional>
#include <future>
namespace pl{
void thread_par_for(unsigned start, unsigned end, std::function<void(unsigned i)> fn, bool par = true){
//internal loop
auto int_fn = [&fn](unsigned int_start, unsigned seg_size){
for (unsigned j = int_start; j < int_start+seg_size; j++){
fn(j);
}
};
//sequenced for
if(!par){
return int_fn(start, end);
}
//get number of threads
unsigned nb_threads_hint = std::thread::hardware_concurrency();
unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
//calculate segments
unsigned total_length = end - start;
unsigned seg = total_length/nb_threads;
unsigned last_seg = seg + total_length%nb_threads;
//launch threads - parallel for
auto threads_vec = std::vector<std::thread>();
threads_vec.reserve(nb_threads);
for(int k = 0; k < nb_threads-1; ++k){
unsigned current_start = seg*k;
threads_vec.emplace_back(std::thread(int_fn, current_start, seg));
}
{
unsigned current_start = seg*(nb_threads-1);
threads_vec.emplace_back(std::thread(int_fn, current_start, last_seg));
}
for (auto& th : threads_vec){
th.join();
}
}
void async_par_for(unsigned start, unsigned end, std::function<void(unsigned i)> fn, bool par = true){
//internal loop
auto int_fn = [&fn](unsigned int_start, unsigned seg_size){
for (unsigned j = int_start; j < int_start+seg_size; j++){
fn(j);
}
};
//sequenced for
if(!par){
return int_fn(start, end);
}
//get number of threads
unsigned nb_threads_hint = std::thread::hardware_concurrency();
unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
//calculate segments
unsigned total_length = end - start;
unsigned seg = total_length/nb_threads;
unsigned last_seg = seg + total_length%nb_threads;
//launch threads - parallel for
auto fut_vec = std::vector<std::future<void>>();
fut_vec.reserve(nb_threads);
for(int k = 0; k < nb_threads-1; ++k){
unsigned current_start = seg*k;
fut_vec.emplace_back(async(int_fn, current_start, seg));
}
{
unsigned current_start = seg*(nb_threads-1);
fut_vec.emplace_back(std::async(std::launch::async, int_fn, current_start, last_seg));
}
for (auto& th : fut_vec){
th.get();
}
}
}
int main() {
srand(time(NULL)); // seed
const int N1 = 1000;
const int N2 = 100000;
int n = 0;
int c = 0;
Concurrency::critical_section cs;
// it is better that N2 >> N1 for better performance
Concurrency::parallel_for(0, N1, [&](int i) {
int t = monte_carlo_count_pi(N2);
cs.lock(); // race condition
n += N2; // total sampling points
c += t; // points fall in the circle
cs.unlock();
});
cout < < "pi ~= " << setprecision(9) << (double)c / n * 4.0 << endl;
return 0;
}
8条答案
按热度按时间ssgvzors1#
使用C++17中的并行算法,我们现在可以用途:
以并行计算循环。第一个参数指定execution policy
txu3uszq2#
你的平台是什么?你可以看看OpenMP,虽然它不是C的一部分。但是它得到了编译器的广泛支持。
至于基于范围的for循环,参见例如Using OpenMP with C++11 range-based for loops?。
我也在http://www.open-std.org上看到过一些文档,表明了将并行结构/算法合并到未来C中的一些努力,但不知道它们的当前状态。
添加一些示例代码:
线程数可以在运行时通过
OMP_NUM_THREADS
环境变量设置。hof1towb3#
使用C++11,你可以用几行代码并行化一个for循环。
我的函数
parallel_for()
(在后面的文章中定义)将一个for循环分割成更小的块(子循环),并将每个块分配给一个线程。我的
parallel_for()
也可以在类中工作:最后这里是
parallel_for()
的实现,只需粘贴一个头文件,随意使用即可:最后,您可以定义宏,以获得更紧凑的表达式:
现在转换序列:
只不过是一个做的问题:
wwtsj6pe4#
如果您愿意让
C++
运行时控制并行性,std::async
可能非常适合这里。示例来自cppreference.com:
9ceoxa925#
这可以使用
threads
库函数来完成,具体来说就是pthreads
库函数,它可以用来并发地执行操作。您可以在这里阅读更多关于他们的信息:http://www.tutorialspoint.com/cplusplus/cpp_multithreading.htm
也可以使用std::thread:http://www.cplusplus.com/reference/thread/thread/
下面的代码使用每个线程的线程ID将数组分成两半:
还要记住编译时必须使用
-lpthread
标志。Ideone上的解决方案链接:http://ideone.com/KcsW4P
w3nuxt5m6#
因为这个线程几乎是我每次寻找并行化方法时的答案,所以我决定在arkan的方法基础上再加一点(见他的答案)。
下面的两个方法几乎是相同的,并且语法简单。只需在项目中包含头文件,然后调用其中一个并行版本:
示例:
头文件-par_for. h:
一些简单的测试表明,使用async的方法速度更快,可能是因为标准库控制是否实际启动新线程。
kcrjzv8t7#
Concurrency::parallel_for(PPL)也是实现任务并行的一个不错的选项。
取自C++ Coding Exercise – Parallel For – Monte Carlo PI Calculation
vmdwslir8#
从C17开始,std::for_each有允许parallel execution的重载。然而,在我的例子中,我的算法需要特定数量的线程来优化执行,而VS 2022中的
std::for_each
实现使用基于std::thread::hardware_concurrency的线程数量。对于那些希望能够控制并行工作者数量的人来说,this simple implementation的行为应该与
std::for_each
类似,而不需要C17: