gpt4 book ai didi

c++11 - 如何使用仅连接一次的多线程从 Internet 读取数据?

转载 作者:行者123 更新时间:2023-12-04 10:01:41 28 4
gpt4 key购买 nike

我正在使用 boost::asio::ip::tcp 构建一个小型的多线程下载程序。我需要每个线程处理一部分数据。我知道它可以通过向请求 header 添加“Range:bytes:xx-xx”来解决问题。但是我不想让程序多次连接到服务器。有什么解决办法吗?

最佳答案

只需阅读它并在适当的时候分派(dispatch)给工作线程。

不知道你想分别处理什么样的块,让我们假设
您从 https://www.mathsisfun.com/includes/primes-to-100k.zip 读取所有素数,分块读取它们,然后在单独的线程上对所有素数进行一些处理。

什么是工作?

这是一些懒惰的主要工作:

void handle_batch(std::vector<size_t> params) {
if (!params.empty()) {
std::cout
<< "Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << std::accumulate(begin(params), end(params), 0ull)
<< std::endl;
}
}

是的,我们只是打印作业参数及其总和的描述。我们可以在上面涂鸦一点,让它更逼真,比如让它花一些时间,并意识到我们在工作线程上,所以我们想要同步对控制台的访问。
void handle_batch(std::vector<size_t> params) {
std::mutex s_mx;

if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}

// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;

// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;
}
}

好的,对于不重要的部分,这已经足够了。

计划

好吧,我选择的方法有点复杂,因为该站点不仅使用 https(呃),还提供 ZIP 文件(呃)。我们正在使用 C++(呃?)。

至少,我们可以用不多的代码同步完成整个 SSL 连接业务,但是我们希望读取是异步的,因为这样我们可以证明
  • 你可以使用 Boost Asio
  • 在主线程上做很多混合 IO
  • 同样适用于 Boost Process 启动 zcat 作为解压缩素数内容的子进程(我们假设类 UNIX 系统安装了 zcat)
  • 这意味着我们将异步写入该子进程 stdin
  • 以及异步读取其标准输出
  • 在批处理作业准备就绪后立即生成它们

  • 这对于您的工作负载来说应该是一个很好的模型,因为工作线程比 IO 花费更多的时间,但是,我们在单个线程上执行许多 IO 任务而不会阻塞。

    让我们获取数据

    如前所述,我们将使用单个线程进行 IO,并为批处理 worker 使用线程池:
    int main() {
    net::io_context io; // main thread does all io
    net::thread_pool pool(6); // worker threads

    那里。那是一个开始。现在,我们想要一个 SSL 连接,并请求该 ZIP。这里是:
    http::response_parser<http::buffer_body> res_reader;
    beast::flat_buffer lookahead; // for the res_reader
    std::array<char,512> buf{0}; // for download content
    auto ctx = ssl_context();
    ssl::stream<tcp::socket> s(io, ctx);

    { // synchronously write request
    std::string host = "www.mathsisfun.com";
    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
    http::write(s, get_request(host, "/includes/primes-to-100k.zip"));

    http::read_header(s, lookahead, res_reader);
    //std::cerr << "Headers: " << res_reader.get().base() << std::endl;
    }

    是的,这已经完成了响应 header 的读取¹。当然我们作弊是因为我们需要三个 helper :
  • 制作 ssl 上下文
    auto ssl_context() {
    ssl::context ctx{ssl::context::sslv23};
    ctx.set_default_verify_paths();
    ctx.set_verify_mode(ssl::verify_peer);
    return ctx;
    }
  • 通过 SSL 连接
    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
    net::connect(s.lowest_layer(), eps);
    s.lowest_layer().set_option(tcp::no_delay(true));

    if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
    throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
    }
    s.handshake(stream::handshake_type::client);
    }
  • 发出 HTTP 请求
    auto get_request(std::string const& host, std::string const& path) {
    using namespace http;
    request<string_body> req;
    req.version(11);
    req.method(verb::get);
    req.target("https://" + host + path);
    req.set(field::user_agent, "test");
    req.set(field::host, host);

    std::cerr << req << std::endl;
    return req;
    }

  • 不错,对于 C++。

    将其导入 zcat
    现在我们从异步开始:让我们有一个“泵”或“循环”将所有响应数据发送到管道中:
    // now, asynchoronusly read contents
    process::async_pipe pipe_to_zcat(io);

    std::function<void(error_code, size_t)> receive_zip;
    receive_zip 就是我们所说的循环。这是一个自链接的异步操作。因此,每次调用它时,它都会将一些数据泵入管道,并为 HTTP 响应再调用一个 async_read:
    receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
    (error_code ec, size_t /*ignore_this*/)
    {
    auto& res = response_reader.get();
    auto& body = res.body();
    if (body.data) {
    auto n = sizeof(buf) - body.size;
    net::write(pipe_to_zcat, net::buffer(buf, n));
    }

    bool done = ec && !(ec == http::error::need_buffer);
    done += response_reader.is_done();

    if (done) {
    std::cerr << "receive_zip: " << ec.message() << std::endl;
    pipe_to_zcat.close();
    } else {
    body.data = buf.data();
    body.size = buf.size();

    http::async_read(s, lookahead, response_reader, receive_zip);
    }
    };

    This slightly complicated looking reading of a buffered response is almost literally from the documentation here.



    现在,我们所要做的就是 prime the pump :
    // kick off receive loop
    receive_zip(error_code{}, 0);

    间奏曲,解压

    这不是有趣的部分,让我们走吧:我们正在启动一个子进程 zcat 并希望第二个管道从以下位置读取输出:
    process::async_pipe zcat_output(io);
    process::child zcat(
    process::search_path("zcat"),
    process::std_in < pipe_to_zcat,
    process::std_out > zcat_output,
    process::on_exit([](int exitcode, std::error_code ec) {
    std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
    }), io);

    中场休息结束:)

    (我们甚至加入了错误报告,因为,为什么不呢?)

    啊,好东西:Primes On Tap!

    现在,我们有另一个异步读取循环,这次是读回未压缩的素数。这是我们将在工作池上组装要处理的批处理作业的地方。
    std::function<void(error_code, size_t)> receive_primes;
    net::streambuf sb;

    像之前的 receive_zip 一样, receive_primes 是我们的循环驱动程序, sb 缓冲区只是一个缓冲区,它可以像通常从 std::istream 那样使用 std::cin 轻松读取。
    receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
    {
    std::istream is(&sb);

    size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
    std::vector<size_t> batch(n);
    std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
    is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant

    post(pool, std::bind(handle_batch, std::move(batch)));
    }

    if (ec) {
    std::cerr << "receive_primes: " << ec.message() << std::endl;
    zcat_output.close();
    } else {
    net::async_read_until(zcat_output, sb, "\n", receive_primes);
    }
    };

    因为 async_read_until 可能会读取部分行,我们计算缓冲区中完整行的数量 ( n ) 并将它们打包成一个向量。在我们确保我们吃掉即将到来的换行符之后,我们......发布批处理作业,最后:
     post(pool, std::bind(handle_batch, std::move(batch)));

    We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.



    再次启动泵:
    // kick off handler loop as well:
    receive_primes(error_code{}, 0);

    把一切放在一起

    好。为高潮做准备。设置好所有异步链后,我们需要做的就是……等等。
        io.run();
    pool.join();
    } // end of main
    io.run() 继续运行两个泵并等待子进程,全部在主线程上,如我们所愿。
    pool.join() 在停止线程池之前等待所有批处理作业完成。如果省略该行,则可能无法运行所有任务,因为 thread_pool 的析构函数会在调用 stop() 之前调用 join()

    Toy around with the buffer size (512 bytes in my example) to see how large batches become. Note that 512 bytes is compressed bytes.



    “活”演示

    遗憾的是,据我所知,没有任何在线编译器支持外部网络访问,因此您必须自己运行这个编译器。为方便起见,这里有一个完整的 list ,以及在我的计算机上运行的示例输出:

    Live On Coliru
    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/http.hpp>
    #include <boost/process.hpp>
    #include <boost/process/async.hpp>
    #include <iomanip>
    #include <iostream>

    void handle_batch(std::vector<size_t> params) {
    std::mutex s_mx;

    if (!params.empty()) {
    // emulate some work, because I'm lazy
    auto sum = std::accumulate(begin(params), end(params), 0ull);
    // then wait some 100..200ms
    {
    using namespace std::chrono_literals;
    std::mt19937 prng(std::random_device{}());
    std::this_thread::sleep_for(
    std::uniform_real_distribution<>(100,200)(prng)*1ms);
    }

    // simple thread id (thread::id displays ugly)
    auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;

    // report results to stdout
    std::lock_guard lk(s_mx); // make sure the output doesn't intermix
    std::cout
    << "Thread #" << std::setw(2) << std::setfill('0') << tid
    << " Batch n:" << params.size()
    << "\tRange [" << params.front() << ".." << params.back() << "]"
    << "\tSum:" << sum
    << std::endl;
    }
    }

    namespace net = boost::asio;
    namespace ssl = net::ssl;
    namespace beast = boost::beast;
    namespace http = beast::http;
    namespace process = boost::process;

    using boost::system::error_code;
    using boost::system::system_error;
    using net::ip::tcp;
    using stream = ssl::stream<tcp::socket>;

    auto ssl_context() {
    ssl::context ctx{ssl::context::sslv23};
    ctx.set_default_verify_paths();
    ctx.set_verify_mode(ssl::verify_peer);
    return ctx;
    }

    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
    net::connect(s.lowest_layer(), eps);
    s.lowest_layer().set_option(tcp::no_delay(true));

    if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
    throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
    }
    s.handshake(stream::handshake_type::client);
    }

    auto get_request(std::string const& host, std::string const& path) {
    using namespace http;
    request<string_body> req;
    req.version(11);
    req.method(verb::get);
    req.target("https://" + host + path);
    req.set(field::user_agent, "test");
    req.set(field::host, host);

    std::cerr << req << std::endl;
    return req;
    }

    int main() {
    net::io_context io; // main thread does all io
    net::thread_pool pool(6); // worker threads

    // outside for lifetime
    http::response_parser<http::buffer_body> response_reader;
    beast::flat_buffer lookahead; // for the response_reader
    std::array<char,512> buf{0}; // for download content
    auto ctx = ssl_context();
    ssl::stream<tcp::socket> s(io, ctx);

    { // synchronously write request
    std::string host = "www.mathsisfun.com";
    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
    http::write(s, get_request(host, "/includes/primes-to-100k.zip"));

    http::read_header(s, lookahead, response_reader);
    //std::cerr << "Headers: " << response_reader.get().base() << std::endl;
    }

    // now, asynchoronusly read contents
    process::async_pipe pipe_to_zcat(io);

    std::function<void(error_code, size_t)> receive_zip;
    receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) {
    auto& res = response_reader.get();
    auto& body = res.body();
    if (body.data) {
    auto n = sizeof(buf) - body.size;
    net::write(pipe_to_zcat, net::buffer(buf, n));
    }

    bool done = ec && !(ec == http::error::need_buffer);
    done += response_reader.is_done();

    if (done) {
    std::cerr << "receive_zip: " << ec.message() << std::endl;
    pipe_to_zcat.close();
    } else {
    body.data = buf.data();
    body.size = buf.size();

    http::async_read(s, lookahead, response_reader, receive_zip);
    }
    };

    // kick off receive loop
    receive_zip(error_code{}, 0);

    process::async_pipe zcat_output(io);
    process::child zcat(
    process::search_path("zcat"),
    process::std_in < pipe_to_zcat,
    process::std_out > zcat_output,
    process::on_exit([](int exitcode, std::error_code ec) {
    std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
    }), io);

    std::function<void(error_code, size_t)> receive_primes;
    net::streambuf sb;
    receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
    {
    std::istream is(&sb);

    size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
    std::vector<size_t> batch(n);
    std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
    is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant

    post(pool, std::bind(handle_batch, std::move(batch)));
    }

    if (ec) {
    std::cerr << "receive_primes: " << ec.message() << std::endl;
    zcat_output.close();
    } else {
    net::async_read_until(zcat_output, sb, "\n", receive_primes);
    }
    };
    // kick off handler loop as well:
    receive_primes(error_code{}, 0);

    io.run();
    pool.join();
    }

    输出:
    GET https://www.mathsisfun.com/includes/primes-to-100k.zip HTTP/1.1
    User-Agent: test
    Host: www.mathsisfun.com


    receive_zip: Success
    Child process exited with 0 (Success)
    receive_primes: End of file
    Thread #11 Batch n:95 Range [599..1237] Sum:86587
    Thread #58 Batch n:170 Range [1249..2549] Sum:320714
    Thread #34 Batch n:170 Range [2551..3919] Sum:549880
    Thread #54 Batch n:170 Range [3923..5407] Sum:790922
    Thread #30 Batch n:170 Range [5413..6863] Sum:1040712
    Thread #60 Batch n:108 Range [2..593] Sum:28697
    Thread #58 Batch n:170 Range [8429..9923] Sum:1560462
    Thread #11 Batch n:170 Range [6869..8423] Sum:1298732
    Thread #30 Batch n:146 Range [12703..14087] Sum:1956410
    Thread #34 Batch n:147 Range [9929..11329] Sum:1563023
    Thread #54 Batch n:146 Range [11351..12697] Sum:1758964
    Thread #60 Batch n:146 Range [14107..15473] Sum:2164462
    Thread #11 Batch n:146 Range [16943..18313] Sum:2576764
    Thread #34 Batch n:146 Range [19861..21313] Sum:3003048
    Thread #30 Batch n:146 Range [18329..19853] Sum:2790654
    Thread #58 Batch n:146 Range [15493..16937] Sum:2365198
    Thread #60 Batch n:146 Range [22721..24109] Sum:3422310
    Thread #54 Batch n:146 Range [21317..22717] Sum:3212180
    Thread #30 Batch n:146 Range [27179..28661] Sum:4081540
    Thread #11 Batch n:146 Range [24113..25693] Sum:3640476
    Thread #34 Batch n:146 Range [25703..27143] Sum:3859484
    Thread #60 Batch n:146 Range [30223..31741] Sum:4525378
    Thread #54 Batch n:146 Range [31751..33211] Sum:4746372
    Thread #58 Batch n:146 Range [28663..30211] Sum:4297314
    Thread #30 Batch n:146 Range [33223..34693] Sum:4958972
    Thread #34 Batch n:146 Range [36307..37799] Sum:5408028
    Thread #11 Batch n:146 Range [34703..36299] Sum:5184000
    Thread #54 Batch n:146 Range [39371..40973] Sum:5865356
    Thread #60 Batch n:146 Range [37811..39367] Sum:5637612
    Thread #58 Batch n:146 Range [40993..42433] Sum:6091022
    Thread #34 Batch n:146 Range [44029..45613] Sum:6541984
    Thread #54 Batch n:146 Range [47287..48817] Sum:7013764
    Thread #30 Batch n:146 Range [42437..44027] Sum:6308156
    Thread #11 Batch n:146 Range [45631..47279] Sum:6780582
    Thread #58 Batch n:146 Range [50341..51913] Sum:7470486
    Thread #34 Batch n:146 Range [51929..53569] Sum:7701048
    Thread #60 Batch n:146 Range [48821..50333] Sum:7239008
    Thread #54 Batch n:146 Range [53591..55147] Sum:7934798
    Thread #11 Batch n:146 Range [56713..58211] Sum:8388956
    Thread #58 Batch n:146 Range [58217..59771] Sum:8617316
    Thread #30 Batch n:146 Range [55163..56711] Sum:8169020
    Thread #60 Batch n:146 Range [61519..63197] Sum:9100594
    Thread #34 Batch n:146 Range [59779..61511] Sum:8856806
    Thread #54 Batch n:146 Range [63199..64849] Sum:9339328
    Thread #11 Batch n:146 Range [64853..66457] Sum:9580694
    Thread #58 Batch n:146 Range [66463..67979] Sum:9816826
    Thread #30 Batch n:146 Range [67987..69779] Sum:10057662
    Thread #54 Batch n:146 Range [72931..74573] Sum:10770902
    Thread #34 Batch n:146 Range [71347..72923] Sum:10529702
    Thread #60 Batch n:146 Range [69809..71341] Sum:10304156
    Thread #11 Batch n:146 Range [74587..76231] Sum:11008056
    Thread #58 Batch n:146 Range [76243..77801] Sum:11251048
    Thread #30 Batch n:146 Range [77813..79561] Sum:11491034
    Thread #34 Batch n:146 Range [81119..82729] Sum:11963076
    Thread #60 Batch n:146 Range [82757..84449] Sum:12207776
    Thread #58 Batch n:146 Range [86183..87767] Sum:12700772
    Thread #54 Batch n:146 Range [79579..81101] Sum:11732042
    Thread #11 Batch n:146 Range [84457..86179] Sum:12455242
    Thread #30 Batch n:146 Range [87793..89527] Sum:12951322
    Thread #34 Batch n:146 Range [89533..91153] Sum:13187046
    Thread #54 Batch n:146 Range [94441..96013] Sum:13904802
    Thread #30 Batch n:146 Range [97829..99487] Sum:14403556
    Thread #58 Batch n:146 Range [92779..94439] Sum:13665032
    Thread #60 Batch n:146 Range [91159..92767] Sum:13431876
    Thread #11 Batch n:146 Range [96017..97813] Sum:14148718
    Thread #34 Batch n:46 Range [99497..99991] Sum:4588078

    ¹您可以通过取消注释该行来打印。请注意,Boost 1.70 没有实现流,b1.72 有一个关于 boost::process::async_pipe 的错误,所以你需要 1.73 来实际打印这样的标题。

    关于c++11 - 如何使用仅连接一次的多线程从 Internet 读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61787334/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com