c++11 - 如何使用仅连接一次的多线程从 Internet 读取数据?

转载 作者:行者123 更新时间:2023-12-04
我正在使用 boost::asio::ip::tcp 构建一个小型的多线程下载程序。我需要每个线程处理一部分数据。我知道它可以通过向请求 header 添加“Range:bytes:xx-xx”来解决问题。但是我不想让程序多次连接到服务器。有什么解决办法吗?



您从 读取所有素数,分块读取它们,然后在单独的线程上对所有素数进行一些处理。



void handle_batch(std::vector<size_t> params) {
std::mutex s_mx;

if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());

// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;

// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;



好吧,我选择的方法有点复杂,因为该站点不仅使用 https(呃),还提供 ZIP 文件(呃)。我们正在使用 C++(呃?)。

至少,我们可以用不多的代码同步完成整个 SSL 连接业务,但是我们希望读取是异步的,因为这样我们可以证明
  • 你可以使用 Boost Asio
  • 在主线程上做很多混合 IO
  • 同样适用于 Boost Process 启动 zcat 作为解压缩素数内容的子进程(我们假设类 UNIX 系统安装了 zcat)
  • 这意味着我们将异步写入该子进程 stdin
  • 以及异步读取其标准输出
  • 在批处理作业准备就绪后立即生成它们

  • 这对于您的工作负载来说应该是一个很好的模型,因为工作线程比 IO 花费更多的时间,但是,我们在单个线程上执行许多 IO 任务而不会阻塞。


    如前所述,我们将使用单个线程进行 IO,并为批处理 worker 使用线程池:
    int main() {
    net::io_context io; // main thread does all io
    net::thread_pool pool(6); // worker threads

    那里。那是一个开始。现在,我们想要一个 SSL 连接,并请求该 ZIP。这里是:
    http::response_parser<http::buffer_body> res_reader;
    beast::flat_buffer lookahead; // for the res_reader
    std::array<char,512> buf{0}; // for download content
    auto ctx = ssl_context();
    ssl::stream<tcp::socket> s(io, ctx);

    { // synchronously write request
    std::string host = "";
    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
    http::write(s, get_request(host, "/includes/"));

    http::read_header(s, lookahead, res_reader);
    //std::cerr << "Headers: " << res_reader.get().base() << std::endl;

    是的,这已经完成了响应 header 的读取¹。当然我们作弊是因为我们需要三个 helper :
  • 制作 ssl 上下文
    auto ssl_context() {
    ssl::context ctx{ssl::context::sslv23};
    return ctx;
  • 通过 SSL 连接
    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
    net::connect(s.lowest_layer(), eps);

    if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
    throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
  • 发出 HTTP 请求
    auto get_request(std::string const& host, std::string const& path) {
    using namespace http;
    request<string_body> req;
    req.method(verb::get);"https://" + host + path);
    req.set(field::user_agent, "test");
    req.set(field::host, host);

    std::cerr << req << std::endl;
    return req;

  • 不错,对于 C++。

    将其导入 zcat
    // now, asynchoronusly read contents
    process::async_pipe pipe_to_zcat(io);

    std::function<void(error_code, size_t)> receive_zip;
    receive_zip 就是我们所说的循环。这是一个自链接的异步操作。因此,每次调用它时,它都会将一些数据泵入管道,并为 HTTP 响应再调用一个 async_read:
    receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
    (error_code ec, size_t /*ignore_this*/)
    auto& res = response_reader.get();
    auto& body = res.body();
    if ( {
    auto n = sizeof(buf) - body.size;
    net::write(pipe_to_zcat, net::buffer(buf, n));

    bool done = ec && !(ec == http::error::need_buffer);
    done += response_reader.is_done();

    if (done) {
    std::cerr << "receive_zip: " << ec.message() << std::endl;
    } else { =;
    body.size = buf.size();

    http::async_read(s, lookahead, response_reader, receive_zip);

    This slightly complicated looking reading of a buffered response is almost literally from the documentation here.

    现在,我们所要做的就是 prime the pump :
    // kick off receive loop
    receive_zip(error_code{}, 0);


    这不是有趣的部分,让我们走吧:我们正在启动一个子进程 zcat 并希望第二个管道从以下位置读取输出:
    process::async_pipe zcat_output(io);
    process::child zcat(
    process::std_in < pipe_to_zcat,
    process::std_out > zcat_output,
    process::on_exit([](int exitcode, std::error_code ec) {
    std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
    }), io);



    啊,好东西:Primes On Tap!

    std::function<void(error_code, size_t)> receive_primes;
    net::streambuf sb;

    像之前的 receive_zip 一样, receive_primes 是我们的循环驱动程序, sb 缓冲区只是一个缓冲区,它可以像通常从 std::istream 那样使用 std::cin 轻松读取。
    receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
    std::istream is(&sb);

    size_t n = std::count(net::buffers_begin(, net::buffers_end(, '\n');
    std::vector<size_t> batch(n);
    std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
    is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant

    post(pool, std::bind(handle_batch, std::move(batch)));

    if (ec) {
    std::cerr << "receive_primes: " << ec.message() << std::endl;
    } else {
    net::async_read_until(zcat_output, sb, "\n", receive_primes);

    因为 async_read_until 可能会读取部分行,我们计算缓冲区中完整行的数量 ( n ) 并将它们打包成一个向量。在我们确保我们吃掉即将到来的换行符之后,我们......发布批处理作业,最后:
     post(pool, std::bind(handle_batch, std::move(batch)));

    We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.

    // kick off handler loop as well:
    receive_primes(error_code{}, 0);


    } // end of main 继续运行两个泵并等待子进程,全部在主线程上,如我们所愿。
    pool.join() 在停止线程池之前等待所有批处理作业完成。如果省略该行,则可能无法运行所有任务,因为 thread_pool 的析构函数会在调用 stop() 之前调用 join()

    Toy around with the buffer size (512 bytes in my example) to see how large batches become. Note that 512 bytes is compressed bytes.


    遗憾的是,据我所知,没有任何在线编译器支持外部网络访问,因此您必须自己运行这个编译器。为方便起见,这里有一个完整的 list ,以及在我的计算机上运行的示例输出:

    Live On Coliru
    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/http.hpp>
    #include <boost/process.hpp>
    #include <boost/process/async.hpp>
    #include <iomanip>
    #include <iostream>

    void handle_batch(std::vector<size_t> params) {
    std::mutex s_mx;

    if (!params.empty()) {
    // emulate some work, because I'm lazy
    auto sum = std::accumulate(begin(params), end(params), 0ull);
    // then wait some 100..200ms
    using namespace std::chrono_literals;
    std::mt19937 prng(std::random_device{}());

    // simple thread id (thread::id displays ugly)
    auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;

    // report results to stdout
    std::lock_guard lk(s_mx); // make sure the output doesn't intermix
    << "Thread #" << std::setw(2) << std::setfill('0') << tid
    << " Batch n:" << params.size()
    << "\tRange [" << params.front() << ".." << params.back() << "]"
    << "\tSum:" << sum
    << std::endl;

    namespace net = boost::asio;
    namespace ssl = net::ssl;
    namespace beast = boost::beast;
    namespace http = beast::http;
    namespace process = boost::process;

    using boost::system::error_code;
    using boost::system::system_error;
    using net::ip::tcp;
    using stream = ssl::stream<tcp::socket>;

    auto ssl_context() {
    ssl::context ctx{ssl::context::sslv23};
    return ctx;

    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
    net::connect(s.lowest_layer(), eps);

    if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
    throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };

    auto get_request(std::string const& host, std::string const& path) {
    using namespace http;
    request<string_body> req;
    req.method(verb::get);"https://" + host + path);
    req.set(field::user_agent, "test");
    req.set(field::host, host);

    std::cerr << req << std::endl;
    return req;

    int main() {
    net::io_context io; // main thread does all io
    net::thread_pool pool(6); // worker threads

    // outside for lifetime
    http::response_parser<http::buffer_body> response_reader;
    beast::flat_buffer lookahead; // for the response_reader
    std::array<char,512> buf{0}; // for download content
    auto ctx = ssl_context();
    ssl::stream<tcp::socket> s(io, ctx);

    { // synchronously write request
    std::string host = "";
    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
    http::write(s, get_request(host, "/includes/"));

    http::read_header(s, lookahead, response_reader);
    //std::cerr << "Headers: " << response_reader.get().base() << std::endl;

    // now, asynchoronusly read contents
    process::async_pipe pipe_to_zcat(io);

    std::function<void(error_code, size_t)> receive_zip;
    receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) {
    auto& res = response_reader.get();
    auto& body = res.body();
    if ( {
    auto n = sizeof(buf) - body.size;
    net::write(pipe_to_zcat, net::buffer(buf, n));

    bool done = ec && !(ec == http::error::need_buffer);
    done += response_reader.is_done();

    if (done) {
    std::cerr << "receive_zip: " << ec.message() << std::endl;
    } else { =;
    body.size = buf.size();

    http::async_read(s, lookahead, response_reader, receive_zip);

    // kick off receive loop
    receive_zip(error_code{}, 0);

    process::async_pipe zcat_output(io);
    process::child zcat(
    process::std_in < pipe_to_zcat,
    process::std_out > zcat_output,
    process::on_exit([](int exitcode, std::error_code ec) {
    std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
    }), io);

    std::function<void(error_code, size_t)> receive_primes;
    net::streambuf sb;
    receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
    std::istream is(&sb);

    size_t n = std::count(net::buffers_begin(, net::buffers_end(, '\n');
    std::vector<size_t> batch(n);
    std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
    is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant

    post(pool, std::bind(handle_batch, std::move(batch)));

    if (ec) {
    std::cerr << "receive_primes: " << ec.message() << std::endl;
    } else {
    net::async_read_until(zcat_output, sb, "\n", receive_primes);
    // kick off handler loop as well:
    receive_primes(error_code{}, 0);;

    GET HTTP/1.1
    User-Agent: test

    receive_zip: Success
    Child process exited with 0 (Success)
    receive_primes: End of file
    Thread #11 Batch n:95 Range [599..1237] Sum:86587
    Thread #58 Batch n:170 Range [1249..2549] Sum:320714
    Thread #34 Batch n:170 Range [2551..3919] Sum:549880
    Thread #54 Batch n:170 Range [3923..5407] Sum:790922
    Thread #30 Batch n:170 Range [5413..6863] Sum:1040712
    Thread #60 Batch n:108 Range [2..593] Sum:28697
    Thread #58 Batch n:170 Range [8429..9923] Sum:1560462
    Thread #11 Batch n:170 Range [6869..8423] Sum:1298732
    Thread #30 Batch n:146 Range [12703..14087] Sum:1956410
    Thread #34 Batch n:147 Range [9929..11329] Sum:1563023
    Thread #54 Batch n:146 Range [11351..12697] Sum:1758964
    Thread #60 Batch n:146 Range [14107..15473] Sum:2164462
    Thread #11 Batch n:146 Range [16943..18313] Sum:2576764
    Thread #34 Batch n:146 Range [19861..21313] Sum:3003048
    Thread #30 Batch n:146 Range [18329..19853] Sum:2790654
    Thread #58 Batch n:146 Range [15493..16937] Sum:2365198
    Thread #60 Batch n:146 Range [22721..24109] Sum:3422310
    Thread #54 Batch n:146 Range [21317..22717] Sum:3212180
    Thread #30 Batch n:146 Range [27179..28661] Sum:4081540
    Thread #11 Batch n:146 Range [24113..25693] Sum:3640476
    Thread #34 Batch n:146 Range [25703..27143] Sum:3859484
    Thread #60 Batch n:146 Range [30223..31741] Sum:4525378
    Thread #54 Batch n:146 Range [31751..33211] Sum:4746372
    Thread #58 Batch n:146 Range [28663..30211] Sum:4297314
    Thread #30 Batch n:146 Range [33223..34693] Sum:4958972
    Thread #34 Batch n:146 Range [36307..37799] Sum:5408028
    Thread #11 Batch n:146 Range [34703..36299] Sum:5184000
    Thread #54 Batch n:146 Range [39371..40973] Sum:5865356
    Thread #60 Batch n:146 Range [37811..39367] Sum:5637612
    Thread #58 Batch n:146 Range [40993..42433] Sum:6091022
    Thread #34 Batch n:146 Range [44029..45613] Sum:6541984
    Thread #54 Batch n:146 Range [47287..48817] Sum:7013764
    Thread #30 Batch n:146 Range [42437..44027] Sum:6308156
    Thread #11 Batch n:146 Range [45631..47279] Sum:6780582
    Thread #58 Batch n:146 Range [50341..51913] Sum:7470486
    Thread #34 Batch n:146 Range [51929..53569] Sum:7701048
    Thread #60 Batch n:146 Range [48821..50333] Sum:7239008
    Thread #54 Batch n:146 Range [53591..55147] Sum:7934798
    Thread #11 Batch n:146 Range [56713..58211] Sum:8388956
    Thread #58 Batch n:146 Range [58217..59771] Sum:8617316
    Thread #30 Batch n:146 Range [55163..56711] Sum:8169020
    Thread #60 Batch n:146 Range [61519..63197] Sum:9100594
    Thread #34 Batch n:146 Range [59779..61511] Sum:8856806
    Thread #54 Batch n:146 Range [63199..64849] Sum:9339328
    Thread #11 Batch n:146 Range [64853..66457] Sum:9580694
    Thread #58 Batch n:146 Range [66463..67979] Sum:9816826
    Thread #30 Batch n:146 Range [67987..69779] Sum:10057662
    Thread #54 Batch n:146 Range [72931..74573] Sum:10770902
    Thread #34 Batch n:146 Range [71347..72923] Sum:10529702
    Thread #60 Batch n:146 Range [69809..71341] Sum:10304156
    Thread #11 Batch n:146 Range [74587..76231] Sum:11008056
    Thread #58 Batch n:146 Range [76243..77801] Sum:11251048
    Thread #30 Batch n:146 Range [77813..79561] Sum:11491034
    Thread #34 Batch n:146 Range [81119..82729] Sum:11963076
    Thread #60 Batch n:146 Range [82757..84449] Sum:12207776
    Thread #58 Batch n:146 Range [86183..87767] Sum:12700772
    Thread #54 Batch n:146 Range [79579..81101] Sum:11732042
    Thread #11 Batch n:146 Range [84457..86179] Sum:12455242
    Thread #30 Batch n:146 Range [87793..89527] Sum:12951322
    Thread #34 Batch n:146 Range [89533..91153] Sum:13187046
    Thread #54 Batch n:146 Range [94441..96013] Sum:13904802
    Thread #30 Batch n:146 Range [97829..99487] Sum:14403556
    Thread #58 Batch n:146 Range [92779..94439] Sum:13665032
    Thread #60 Batch n:146 Range [91159..92767] Sum:13431876
    Thread #11 Batch n:146 Range [96017..97813] Sum:14148718
    Thread #34 Batch n:46 Range [99497..99991] Sum:4588078

    ¹您可以通过取消注释该行来打印。请注意,Boost 1.70 没有实现流,b1.72 有一个关于 boost::process::async_pipe 的错误,所以你需要 1.73 来实际打印这样的标题。

    关于c++11 - 如何使用仅连接一次的多线程从 Internet 读取数据?,我们在Stack Overflow上找到一个类似的问题:

