gpt4 book ai didi

c++ - 如何验证运行时失败是否是由于生成的线程过多造成的?

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:30:11 25 4
gpt4 key购买 nike

我正在使用 std::thread 和 gcc 作为我的编译器来实现并行合并,如 Cormen's Introduction to Algorithms 中所述。 .

我想我得到了可以工作的代码。它传递所有不太大的随机种子数组。但是,当我尝试合并两个大数组(每个数组 1e6 个元素)时,我得到以下终止:

terminate called without an active exception
terminate called recursively
terminate called recursively

使用 gdb 没有帮助:它在运行期间会损坏。

我很确定由于产生了太多线程而导致运行失败。

我该怎么做才能确认此错误是由于生成的 std::threads 过多造成的?

注意事项

  1. 代码工作到 n=1e4,失败到 n=1e5
  2. 如果您想查看输出,请定义 DEBUG,但我不建议这样做,除非 n 很小,例如 10 或 50。

  3. STRBUF_SIZE/fprintf 的使用很丑陋,但 iostream 不能很好地在线程中刷新 - 这是 hacky,但有效(无需关注此处)。
  4. 我尝试按照 Barnes53 的建议通过在线程周围使用 try/catch block 来尝试,但这显然不起作用。
  5. 我知道产生大量的线程是一件坏事 - 在这一点上,我只是想实现书中的内容,看看它是否有效,也许还能发现它的局限性。

更新

  1. GManNickG's answer below帮助:不是每次运行,但在 1e5 的某些运行中,我可以看到,资源确实消失了。
  2. 我可能会研究某种 k 路并行排序,如果此算法无法挽救,我可以控制生成的线程数。

代码

#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>

#define STRBUF_SIZE 1024

class Random
{
public:
Random( unsigned int seed=::time(nullptr))
: m_seed( seed )
{ }
// between [ 0 .. n-1 ]
unsigned int rand_uint( unsigned int n )
{
return static_cast<unsigned int>
(static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
}
unsigned int getSeed() const { return m_seed; }
private:
unsigned int m_seed;
};

template<typename T>
char* dump( char* line, T it1, T it2 )
{
char buf[80];
line[0] = '\0';
for( T it=it1; it!=it2; ++it )
{
sprintf( buf, "%u ", *it );
strcat( line, buf );
}
return line;
}

template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
auto low = beg;
auto high = std::max( beg, end ); // end+1
while( low < high )
{
auto mid = low + std::distance( low, high ) / 2;
if ( value <= *mid )
high = mid;
else
low = mid + 1;
}
return high;
}

template< class InputIt, class OutputIt >
void p_merge(
char const* msg,
unsigned depth,
unsigned parent_lvl_id,
unsigned lr,
InputIt p1, InputIt r1,
InputIt p2, InputIt r2,
OutputIt p3, OutputIt r3
)
{
#ifdef DEBUG
char buff[STRBUF_SIZE];
#endif
unsigned sum_prev = pow( 2, depth ) - 1;
unsigned lvl_id = 2*parent_lvl_id + lr;
unsigned thread_no = sum_prev + lvl_id + 1;

unsigned limit0 = sum_prev + 1;
unsigned limit1 = pow( 2, depth+1 ) - 1;

#ifdef DEBUG
char msg_dep[256];
sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
fprintf( stderr, "%s\n", msg_dep );
#endif

if ( thread_no<limit0 || thread_no>limit1 )
{
fprintf( stderr, "OUT OF BOUNDS\n" );
exit( 1 );
}

auto n1 = std::distance( p1, r1 );
auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
fprintf( stderr, "%s dist[v1]=%2ld : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
fprintf( stderr, "%s dist[v2]=%2ld : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
if ( n1<n2 )
{
std::swap( p1, p2 );
std::swap( r1, r2 );
std::swap( n1, n2 );
#ifdef DEBUG
fprintf( stderr, "%s swapped[v1] : %s\n", msg_dep, dump( buff, p1, r1 ));
fprintf( stderr, "%s swapped[v2] : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
}
if ( n1==0 )
{
#ifdef DEBUG
fprintf( stderr, "%s done \n", msg_dep );
#endif
return;
}
auto q1 = p1 + n1 / 2; // midpoint
auto q2 = binary_search_it( p2, r2, *q1 ); // <q1 q2[q1] >=q1
auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
*q3 = *q1;

#ifdef DEBUG
fprintf( stderr, "%s q1[median]=%u : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
fprintf( stderr, "%s q3(copied)=%u : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif

#ifdef DEBUG
auto d1 = std::distance( p1, q1-1 );
auto d2 = std::distance( q1+1, r1 );
fprintf( stderr, "%s q1[dist_L]=%ld : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q1[dist_M]=%ld : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif


try {
std::thread t1{
[&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1, p2, q2, p3, r3 ); }
};
std::thread t2{
[&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); }
};
t1.join();
t2.join();
}
catch( ... )
{
fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
exit( 1 );
}

#ifdef DEBUG
fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}

int
main( int argv, char* argc[] )
{
// ok up to 1e4, fails by 1e5
unsigned n = 1e5;
Random r;
std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );

#ifdef DEBUG
fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif

std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );

#ifdef DEBUG
char buff[STRBUF_SIZE];
fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif

std::sort( v1.begin(), v1.end() );
std::sort( v2.begin(), v2.end() );

p_merge( "TOP ", 0, 0, 0,
v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );

assert( std::is_sorted( v3.begin(), v3.end() ));

#ifdef DEBUG
fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}

最佳答案

您可以捕获 std::system_error 并检查代码是否为 resource_unavailable_try_again:

#include <atomic>
#include <iostream>
#include <system_error>
#include <thread>
#include <vector>

class thread_collection
{
public:
thread_collection() :
mStop(false)
{}

~thread_collection()
{
clear();
}

template <typename Func, typename... Args>
bool add(Func&& func, Args&&... args)
{
try
{
mThreads.emplace_back(std::forward<Func>(func),
std::cref(mStop),
std::forward<Args>(args)...);
}
catch (const std::system_error& e)
{
if (e.code().value() == std::errc::resource_unavailable_try_again)
return false; // not possible to make more threads right now
else
throw; // something else
}

return true; // can keep going
}

void clear()
{
mStop = true;
for (auto& thread : mThreads)
{
if (thread.joinable())
thread.join();
}

mThreads.clear();
mStop = true;
}

std::size_t size() const
{
return mThreads.size();
}

private:
thread_collection(const thread_collection&);
thread_collection& operator=(const thread_collection&);

std::atomic<bool> mStop;
std::vector<std::thread> mThreads;
};

void worker(const std::atomic<bool>& stop)
{
while (!stop)
std::this_thread::yield();
}

int main()
{
thread_collection threads;

try
{
while (threads.add(worker))
continue;

std::cout << "Exhausted thread resources!" << std::endl;
}
catch (const std::exception& e)
{
std::cout << "Stopped for some other reason: " << e.what() << std::endl;
}

std::cout << "Made: " << threads.size() << " threads." << std::endl;
threads.clear();
}

(运行此程序需要您自担风险!)

根据§30.3.1.2/4,这是用于指示线程创建失败的错误代码:

Error conditions:
resource_unavailable_try_again — the system lacked the necessary resources to create another thread, or the system-imposed limit on the number of threads in a process would be exceeded.

请注意,这可能是由于您自己的参数被复制到生成的线程而引发的。为了防止这种情况发生,您需要预先构造您的参数,然后将它们不抛出移动到您的线程函数。

也就是说,无论如何,您最好不要限制线程创建。运行的线程多于内核可以执行的线程是没有意义的。使用 std::thread::hardware_concurrency 获取该数字。

关于c++ - 如何验证运行时失败是否是由于生成的线程过多造成的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14408796/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com