gpt4 book ai didi

c++ - 动态生成和安全使用spsc_queues

转载 作者:太空宇宙 更新时间:2023-11-04 11:35:32 27 4
gpt4 key购买 nike

我制作的唯一boost::lockfreespsc_queue,这太神奇了。

但是,我想在一个线程与cores - 1线程来回传递信息的地方实现它。

我当时在想,每个工作线程都有自己的spsc_queues集合,该集合将存储在vector s中,在那里主线程将信息传递到一个传出队列,然后移至vector中的下一个队列,并等等,以及循环进入的队列。

可以安全地推送和弹出两个spsc_queue中的这些vector吗?

如果没有,是否有根据我的意图使用spsc_queues的替代方法?

最佳答案

基本上,您建议以预期的方式使用2x(cores-1)spsc_queues。是的,这可以工作。

我看不出您将如何处理主线程上的响应(“传入队列”)。实话实说,传入队列上没有“等待”操作,您也不想要一个(不再是非常无锁的,并且在等待传入消息时,所有其他工作人员都不会得到服务)。

Aside: If you dimension your response queues such that they will never overflow, then you could get a long way with naive-roundrobin reading from it (just don't attempt to read all messages from a single response queue, because this is a sure-fire way to get scheduling starvation for the other response queues).

Code sample at the bottom (CODE SAMPLE)



所有这些使我强烈怀疑您实际上是 异​​步,而不是 并发。我有一种让您的应用程序在1个线程上运行的感觉,这是非常高兴的,只是尽快“尽可能地”服务每条可用消息(无论消息的来源或内容如何)。
  • 对于可以在非常短的时间内处理的大量小消息** [¹] **,此模型将很好地扩展。
  • 当1个线程饱和时,可以通过添加worker进行扩展。
  • 在具有需要大量处理的消息的服务中,您可以以异步的方式在仅处理低频请求的专用线程上分担这些任务:它们只需将小的“完成”消息推回主工作队列即可他们完成了。

  • 所有这些使我想到了libuv或Boost Asio之类的库。如果您已经一手掌握了您将需要无锁运行来获得所需吞吐量的信息(这在工业强度服务器解决方案中很少见),则可以使用无锁队列进行模拟。这需要做更多的工作,因为您必须将epoll / select / poll循环集成到生产者中。我建议您保持简单,简单,并仅在实际需要时才采用附加的复杂性。

    Mantra: correct, well-factored first; optimize later



    (请注意此处的“构造良好”。在这种情况下,这意味着您将/不/允许在高吞吐量队列上执行缓慢的处理任务。)

    代码样本

    如所 promise 的,一个简单的概念证明显示了使用带有多个工作线程的多个双向SPSC队列消息传递。

    完全无锁定的版本: Live On Coliru

    这里有很多微妙之处。特别要注意的是,队列 的维数不足会如何导致导致静默丢弃的消息。如果消费者能够跟上生产者的步伐,这将不会发生,但是只要您不知道操作系统的 Activity ,您就应该对此进行检查。

    更新根据注释中的请求,这是一个检查队列饱和度的版本-不丢弃消息。 也是Live On Coliru还是
  • 不能删除任何消息
  • 没有更多的延迟到达(因为直到收到所有响应才退出主循环)
  • 循环不再绑定(bind)到循环变量,因为发送可能会停顿,这将导致始终读取同一响应队列。这是死锁或其他最坏情况下性能的秘诀。
  • 在队列饱和的情况下,我们必须考虑一种平衡负载的适当方法。我选择了小睡。从技术上讲,这意味着当队列饱和时,我们的无锁免等待解决方案将降级为常规协作多线程。如果检测到这种情况,也许您希望增加队列。这一切都取决于您的系统。
  • 您将想知道什么时候发生。我包括了所有线程的简单拥塞统计信息。在我的系统上,通过microsleep调用sleep_for(nanoseconds(1)),输出为:
    Received 1048576 responses (97727 100529 103697 116523 110995 115291 103048 102611 102583 95572 )

    Total: 1048576 responses/1048576 requests
    Main thread congestion: 21.2%
    Worker #0 congestion: 1.7%
    Worker #1 congestion: 3.1%
    Worker #2 congestion: 2.0%
    Worker #3 congestion: 2.5%
    Worker #4 congestion: 4.5%
    Worker #5 congestion: 2.5%
    Worker #6 congestion: 3.0%
    Worker #7 congestion: 3.2%
    Worker #8 congestion: 3.1%
    Worker #9 congestion: 3.6%

    real 0m0.616s
    user 0m3.858s
    sys 0m0.025s

    如您所见,Coliru的调音必须大不相同。只要您的系统存在以最大负载运行的风险,就需要进行此调整。
  • 相反,您必须考虑在队列为空时如何限制负载:此刻,工作人员将仅在队列上忙循环,等待消息出现。在真实的服务器环境中,当负载突然爆发时,您将需要检测“空闲”时间段并降低轮询频率,以节省CPU功耗(同时允许CPU最大化其他线程的吞吐量)。
    此答案中包括第二个“混合”版本(在队列饱和之前是无锁的):
    #include <boost/lockfree/spsc_queue.hpp>
    #include <boost/scoped_ptr.hpp>
    #include <boost/thread.hpp>
    #include <memory>
    #include <iostream>
    #include <iterator>

    namespace blf = boost::lockfree;

    static boost::atomic_bool shutdown(false);

    static void nanosleep()
    {
    //boost::this_thread::yield();
    boost::this_thread::sleep_for(boost::chrono::nanoseconds(1));
    }

    struct Worker
    {
    typedef blf::spsc_queue<std::string > queue;
    typedef std::unique_ptr<queue> qptr;
    qptr incoming, outgoing;
    size_t congestion = 0;

    Worker() : incoming(new queue(64)), outgoing(new queue(64))
    {
    }

    void operator()()
    {
    std::string request;
    while (!shutdown)
    {
    while (incoming->pop(request))
    while (!outgoing->push("Ack: " + request))
    ++congestion, nanosleep();
    }
    }
    };

    int main()
    {
    boost::thread_group g;

    std::vector<Worker> workers(10);
    std::vector<size_t> responses_received(workers.size());

    for (auto& w : workers)
    g.create_thread(boost::ref(w));

    // let's give them something to do
    const auto num_requests = (1ul<<20);
    std::string response;
    size_t congestion = 0;

    for (size_t total_sent = 0, total_received = 0; total_sent < num_requests || total_received < num_requests;)
    {
    if (total_sent < num_requests)
    {
    // send to a random worker
    auto& to = workers[rand() % workers.size()];
    if (to.incoming->push("request " + std::to_string(total_sent)))
    ++total_sent;
    else
    congestion++;
    }

    if (total_received < num_requests)
    {
    static size_t round_robin = 0;
    auto from = (++round_robin) % workers.size();
    if (workers[from].outgoing->pop(response))
    {
    ++responses_received[from];
    ++total_received;
    }
    }
    }

    auto const sum = std::accumulate(begin(responses_received), end(responses_received), size_t());
    std::cout << "\nReceived " << sum << " responses (";
    std::copy(begin(responses_received), end(responses_received), std::ostream_iterator<size_t>(std::cout, " "));
    std::cout << ")\n";

    shutdown = true;
    g.join_all();

    std::cout << "\nTotal: " << sum << " responses/" << num_requests << " requests\n";

    std::cout << "Main thread congestion: " << std::fixed << std::setprecision(1) << (100.0*congestion/num_requests) << "%\n";

    for (size_t idx = 0; idx < workers.size(); ++idx)
    std::cout << "Worker #" << idx << " congestion: " << std::fixed << std::setprecision(1) << (100.0*workers[idx].congestion/responses_received[idx]) << "%\n";
    }

    [¹] 与以往一样,“非常少的时间”是一个相对的概念,大致意味着“比新消息之间的平均时间短的时间”。例如。如果您每秒有100个请求,那么对于单线程系统,5ms的处理时间将“很少”。但是,如果每秒有1万个请求,则1毫秒的处理时间大约是16核服务器上的限制。

    关于c++ - 动态生成和安全使用spsc_queues,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23183629/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com