gpt4 book ai didi

c++ - 零MQ与C++绑定(bind)和Open MP阻止问题。为什么?

转载 作者:行者123 更新时间:2023-11-30 05:13:49 25 4
gpt4 key购买 nike

我为ZeroMQ设置了wrote a test,以说服自己,它能够独立于处理顺序将回复映射到客户端,这将证明它是线程安全的。

它是一个多线程服务器,仅将接收到的消息返回给发件人。客户端从多个线程发送一些消息,并检查是否收到了相同的消息。对于多线程,我使用OpenMP。

该测试工作正常,我想继续使用zerotQ的oj​​it_a重新实现它。现在,它不再以相同的方式工作。

这是ZMQPP的代码:

#include <gtest/gtest.h>
#include <zmqpp/zmqpp.hpp>
#include <zmqpp/proxy.hpp>

TEST(zmqomp, order) {
zmqpp::context ctx;

std::thread proxy([&ctx] {
zmqpp::socket dealer(ctx, zmqpp::socket_type::xrequest);
zmqpp::socket router(ctx, zmqpp::socket_type::xreply);
router.bind("tcp://*:1234");
dealer.bind("inproc://workers");
zmqpp::proxy(router, dealer);
});

std::thread worker_starter([&ctx] {
#pragma omp parallel
{
zmqpp::socket in(ctx, zmqpp::socket_type::reply);
in.connect("inproc://workers");
#pragma omp for
for (int i = 0; i < 1000; i++) {
std::string request;
in.receive(request);
in.send(request);
}
}
});

std::thread client([&ctx] {
#pragma omp parallel
{
zmqpp::socket out(ctx, zmqpp::socket_type::request);
out.connect("tcp://localhost:1234");
#pragma omp for
for (int i = 0; i < 1000; i++) {
std::string msg("Request " + std::to_string(i));
out.send(msg);
std::string reply;
out.receive(reply);

EXPECT_EQ(reply, msg);
}
}
});

client.join();
worker_starter.join();
ctx.terminate();
proxy.join();
}

测试会阻塞并且不会执行到最后。我玩了一下 #pragma,发现只有一个更改可以“修复”它:
//#pragma omp parallel for
for (int i = 0; i < 250; i++) {

在这种情况下,代码仍然可以并行执行,但是我必须将循环执行次数除以我的多个物理内核。

有人知道这里发生了什么吗?

最佳答案

序言:ZeroMQ是按定义和设计的,不是线程安全的。

通常,这并不重要,因为有一些安全的设计方法,但是一旦遵循了建议的 TEST(){...} 设计,这里的情况就更糟了。

与ZeroMQ一起度过了一段时间之后,您的提案由于违反了几项主要原则而大吃一惊,否则,它们将使分布式架构比单片代码的纯 SEQ 更智能地工作。

ZeroMQ在(几乎)每三段中说服避免共享资源。 零共享是ZeroMQ出色的可扩展性能和最小化的延迟最大化之一,简而言之。

因此,最好完全避免共享 zmq.Context() 实例(除非人们非常了解,为什么以及如何在后台运行)。

因此,尝试并行触发1000次(几乎)(不是真正的 PAR )将一些事件流发送到zmq.Context的共享实例上(使用默认参数实例化且没有性能调整适应的事件越少)肯定会遭受与建议的性能和设计相反的工作。

有哪些制约因素,而不是 flutter 朔迷离?

1)每个zmq.Context() 实例都有数量有限的I / O线程,这些线程是在实例化过程中创建的。一旦合理的设计需要一些性能调整,就有可能增加这样的I / O线程数,并且数据泵将更好地工作(当然,没有任何数量的数据泵可以挽救不良的设计,灾难性的设计就越少) /分布式计算系统的体系结构()。

2)每个zmq.Socket() 实例都有一个{显式}映射到相应的I / O线程( Ref. 1) )。一旦合理的设计需要增强的鲁棒性,以应对缓慢的事件循环处理或数据流 Storm (或负载平衡,或您所说的)引起的其他不利影响,就有机会受益于分而治之的方法使用 .setsockopt( zmq.AFFINITY, ... ) 方法将每个zmq.Socket()实例直接映射到各自的I / O线程,从而在实际操作期间始终控制哪些缓冲和内部队列为哪些资源而战。在任何情况下,如果线程总数超过了本地主机的内核数,那么 just-CONCURRENT调度就很明显(因此,梦见真正的 PAR 执行梦想基本上是无意间丢失的。 )。

3)每个zmq.Socket() 还具有一对“隐藏队列破坏者” ,称为高水印。这些设置要么{隐式|明确地},后者无疑是性能调整的更明智的方式。为什么要破坏者?因为这些可以稳定并保护分布式计算系统免于溢出,并且允许简单地丢弃 HWM 级别以上的每条消息,以保护系统即使在 Storm 下也能永远运行,虚假爆炸数据包或DDoS类型的攻击。有许多工具可用于调整ZeroMQ Context()实例的行为的这一域,这超出了此答案的范围(请参阅:我在ZeroMQ AFFINITY上的其他文章或.setsockopt()方法中使用的ZeroMQ API规范)。

4)每个基于tcp://传输类zmq.Socket()实例还继承了一些与O / S相关的遗产。一些操作系统通过扩展IP数据包(在任何ZeroMQ控件之外)直至达到某个阈值,来证明存在这种风险,因此应谨慎设计此类情况,以避免对预期的应用程序信令产生不利影响/消息传递动态和健壮性,以应对这种不可控制的(外部系统)缓冲习惯。

5)每个 .recv() .send() 方法调用都是按定义阻塞的,这是大规模分布式计算系统永远都不应冒险的事情。永远不能。即使以教科书为例。而是使用这些调用的非阻塞形式。总是。 这被授予。

6)每个zmq.Socket() 实例都应执行一组仔细而优雅的终止步骤。预防性的步骤 .setsockopt( zmq.LINGER, 0 ) +一个显式的 .close() 方法被公平地包含在每个用例中(并且无论是否出现任何异常,都具有很强的执行力)。穷人{自我|这种做法的团队纪律是,这肯定会使整个应用程序基础架构挂断,因为只是没有对强制性的资源管理策略给予应有的关注。这是任何严肃的分布式计算项目的必备部分。甚至教科书的例子也应该有这个。没有异常(exception)。别找借口。 这被授予。

关于c++ - 零MQ与C++绑定(bind)和Open MP阻止问题。为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43749274/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com