gpt4 book ai didi

c++ - 并行 for_each 比 std::for_each 慢两倍以上

转载 作者:可可西里 更新时间:2023-11-01 17:55:38 28 4
gpt4 key购买 nike

我正在阅读 C++ Concurrency in Action 安东尼·威廉姆斯。在关于设计并发代码的章节中有并行版本的std::for_each。算法。这是本书中略微修改的代码:

join_thread.hpp

#pragma once

#include <vector>
#include <thread>

class join_threads
{
public:
explicit join_threads(std::vector<std::thread>& threads)
: threads_(threads) {}

~join_threads()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
if(threads_[i].joinable())
{
threads_[i].join();
}
}
}

private:
std::vector<std::thread>& threads_;
};

parallel_for_each.hpp

#pragma once

#include <future>
#include <algorithm>

#include "join_threads.hpp"

template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func func)
{
const auto length = std::distance(first, last);
if (0 == length) return;

const auto min_per_thread = 25u;
const unsigned max_threads = (length + min_per_thread - 1) / min_per_thread;

const auto hardware_threads = std::thread::hardware_concurrency();

const auto num_threads= std::min(hardware_threads != 0 ?
hardware_threads : 2u, max_threads);

const auto block_size = length / num_threads;

std::vector<std::future<void>> futures(num_threads - 1);
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);

auto block_start = first;
for (unsigned i = 0; i < num_threads - 1; ++i)
{
auto block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void (void)> task([block_start, block_end, func]()
{
std::for_each(block_start, block_end, func);
});
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}

std::for_each(block_start, last, func);

for (size_t i = 0; i < num_threads - 1; ++i)
{
futures[i].get();
}
}

我用 std::for_each 的顺序版本对其进行了基准测试使用以下程序:

main.cpp

#include <iostream>
#include <random>
#include <chrono>

#include "parallel_for_each.hpp"

using namespace std;

constexpr size_t ARRAY_SIZE = 500'000'000;
typedef std::vector<uint64_t> Array;

template <class FE, class F>
void test_for_each(const Array& a, FE fe, F f, atomic<uint64_t>& result)
{
auto time_begin = chrono::high_resolution_clock::now();
result = 0;
fe(a.begin(), a.end(), f);
auto time_end = chrono::high_resolution_clock::now();

cout << "Result = " << result << endl;
cout << "Time: " << chrono::duration_cast<chrono::milliseconds>(
time_end - time_begin).count() << endl;
}

int main()
{
random_device device;
default_random_engine engine(device());
uniform_int_distribution<uint8_t> distribution(0, 255);

Array a;
a.reserve(ARRAY_SIZE);

cout << "Generating array ... " << endl;
for (size_t i = 0; i < ARRAY_SIZE; ++i)
a.push_back(distribution(engine));

atomic<uint64_t> result;
auto acc = [&result](uint64_t value) { result += value; };

cout << "parallel_for_each ..." << endl;
test_for_each(a, parallel_for_each<Array::const_iterator, decltype(acc)>, acc, result);
cout << "for_each ..." << endl;
test_for_each(a, for_each<Array::const_iterator, decltype(acc)>, acc, result);

return 0;
}

我机器上算法的并行版本比顺序版本慢两倍以上:

parallel_for_each ...
Result = 63750301073
Time: 5448
for_each ...
Result = 63750301073
Time: 2496

我在 Ubuntu Linux 上使用 GCC 6.2 编译器,运行在 Intel(R) Core(TM) i3-6100 CPU @ 3.70GHz.

如何解释这种行为?这是因为分享了atomic<uint64_t>吗线程和缓存乒乓之间的变量?

我分别使用 perf 对两者进行了概要分析。对于并行版本,统计信息如下:

 1137982167      cache-references                                            
247652893 cache-misses # 21,762 % of all cache refs
60868183996 cycles
27409239189 instructions # 0,45 insns per cycle
3287117194 branches
80895 faults
4 migrations

对于顺序的:

  402791485      cache-references                                            
246561299 cache-misses # 61,213 % of all cache refs
40284812779 cycles
26515783790 instructions # 0,66 insns per cycle
3188784664 branches
48179 faults
3 migrations

很明显,并行版本会产生更多的缓存引用、周期和故障,但为什么呢?

最佳答案

共享相同的result变量:所有线程都在atomic<uint64_t> result上累积,抖动缓存!

每次一个线程写入result , 其他核心中的所有缓存都无效:这导致缓存行争用

更多信息:

  • "Sharing Is the Root of All Contention" .

    [...] to write to a memory location a core must additionally have exclusive ownership of the cache line containing that location. While one core has exclusive use, all other cores trying to write the same memory location must wait and take turns — that is, they must run serially. Conceptually, it's as if each cache line were protected by a hardware mutex, where only one core can hold the hardware lock on that cache line at a time.

  • This article on "false sharing"涵盖了类似的问题,更深入地解释了缓存中发生的情况。


我对您的程序进行了一些修改并取得了以下结果(在具有 i7-4770K [8 线程 + 超线程] 的机器上):

Generating array ...
parallel_for_each ...
Result = 63748111806
Time: 195
for_each ...
Result = 63748111806
Time: 2727

并行版本比串行版本快大约 92%


  1. std::futurestd::packaged_task重量级抽象。在这种情况下,一个 std::experimental::latch 就足够了。

  2. 每个任务都发送到线程池,这最大限度地减少了线程创建开销

  3. 每个任务都有自己的累加器。这消除了共享

代码可用here on my GitHub .它使用了一些个人依赖项,但无论如何您都应该理解这些变化。


以下是最重要的变化:

// A latch is being used instead of a vector of futures.
ecst::latch l(num_threads - 1);

l.execute_and_wait_until_zero([&]
{
auto block_start = first;
for (unsigned i = 0; i < num_threads - 1; ++i)
{
auto block_end = block_start;
std::advance(block_end, block_size);

// `p` is a thread pool.
// Every task posted in the thread pool has its own `tempacc` accumulator.
p.post([&, block_start, block_end, tempacc = 0ull]() mutable
{
// The task accumulator is filled up...
std::for_each(block_start, block_end, [&tempacc](auto x){ tempacc += x; });

// ...and then the atomic variable is incremented ONCE.
func(tempacc);
l.decrement_and_notify_all();
});

block_start = block_end;
}

// Same idea here: accumulate to local non-atomic counter, then
// add the partial result to the atomic counter ONCE.
auto tempacc2 = 0ull;
std::for_each(block_start, last, [&tempacc2](auto x){ tempacc2 += x; });
func(tempacc2);
});

关于c++ - 并行 for_each 比 std::for_each 慢两倍以上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40805197/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com