- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 boost::asio::ip::tcp 构建一个小型的多线程下载程序。我需要每个线程处理一部分数据。我知道它可以通过向请求 header 添加“Range:bytes:xx-xx”来解决问题。但是我不想让程序多次连接到服务器。有什么解决办法吗?
最佳答案
只需阅读它并在适当的时候分派(dispatch)给工作线程。
不知道你想分别处理什么样的块,让我们假设
您从 https://www.mathsisfun.com/includes/primes-to-100k.zip 读取所有素数,分块读取它们,然后在单独的线程上对所有素数进行一些处理。
什么是工作?
这是一些懒惰的主要工作:
void handle_batch(std::vector<size_t> params) {
if (!params.empty()) {
std::cout
<< "Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << std::accumulate(begin(params), end(params), 0ull)
<< std::endl;
}
}
void handle_batch(std::vector<size_t> params) {
std::mutex s_mx;
if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}
// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;
}
}
zcat
作为解压缩素数内容的子进程(我们假设类 UNIX 系统安装了 zcat
)int main() {
net::io_context io; // main thread does all io
net::thread_pool pool(6); // worker threads
http::response_parser<http::buffer_body> res_reader;
beast::flat_buffer lookahead; // for the res_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);
{ // synchronously write request
std::string host = "www.mathsisfun.com";
connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
http::read_header(s, lookahead, res_reader);
//std::cerr << "Headers: " << res_reader.get().base() << std::endl;
}
auto ssl_context() {
ssl::context ctx{ssl::context::sslv23};
ctx.set_default_verify_paths();
ctx.set_verify_mode(ssl::verify_peer);
return ctx;
}
void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
net::connect(s.lowest_layer(), eps);
s.lowest_layer().set_option(tcp::no_delay(true));
if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
}
s.handshake(stream::handshake_type::client);
}
auto get_request(std::string const& host, std::string const& path) {
using namespace http;
request<string_body> req;
req.version(11);
req.method(verb::get);
req.target("https://" + host + path);
req.set(field::user_agent, "test");
req.set(field::host, host);
std::cerr << req << std::endl;
return req;
}
zcat
// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);
std::function<void(error_code, size_t)> receive_zip;
receive_zip
就是我们所说的循环。这是一个自链接的异步操作。因此,每次调用它时,它都会将一些数据泵入管道,并为 HTTP 响应再调用一个
async_read
:
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
(error_code ec, size_t /*ignore_this*/)
{
auto& res = response_reader.get();
auto& body = res.body();
if (body.data) {
auto n = sizeof(buf) - body.size;
net::write(pipe_to_zcat, net::buffer(buf, n));
}
bool done = ec && !(ec == http::error::need_buffer);
done += response_reader.is_done();
if (done) {
std::cerr << "receive_zip: " << ec.message() << std::endl;
pipe_to_zcat.close();
} else {
body.data = buf.data();
body.size = buf.size();
http::async_read(s, lookahead, response_reader, receive_zip);
}
};
This slightly complicated looking reading of a buffered response is almost literally from the documentation here.
// kick off receive loop
receive_zip(error_code{}, 0);
zcat
并希望第二个管道从以下位置读取输出:
process::async_pipe zcat_output(io);
process::child zcat(
process::search_path("zcat"),
process::std_in < pipe_to_zcat,
process::std_out > zcat_output,
process::on_exit([](int exitcode, std::error_code ec) {
std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
}), io);
std::function<void(error_code, size_t)> receive_primes;
net::streambuf sb;
receive_zip
一样,
receive_primes
是我们的循环驱动程序,
sb
缓冲区只是一个缓冲区,它可以像通常从
std::istream
那样使用
std::cin
轻松读取。
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
{
std::istream is(&sb);
size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
std::vector<size_t> batch(n);
std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant
post(pool, std::bind(handle_batch, std::move(batch)));
}
if (ec) {
std::cerr << "receive_primes: " << ec.message() << std::endl;
zcat_output.close();
} else {
net::async_read_until(zcat_output, sb, "\n", receive_primes);
}
};
async_read_until
可能会读取部分行,我们计算缓冲区中完整行的数量 (
n
) 并将它们打包成一个向量。在我们确保我们吃掉即将到来的换行符之后,我们......发布批处理作业,最后:
post(pool, std::bind(handle_batch, std::move(batch)));
We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.
// kick off handler loop as well:
receive_primes(error_code{}, 0);
io.run();
pool.join();
} // end of main
io.run()
继续运行两个泵并等待子进程,全部在主线程上,如我们所愿。
pool.join()
在停止线程池之前等待所有批处理作业完成。如果省略该行,则可能无法运行所有任务,因为
thread_pool
的析构函数会在调用
stop()
之前调用
join()
。
Toy around with the buffer size (512 bytes in my example) to see how large batches become. Note that 512 bytes is compressed bytes.
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/http.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iomanip>
#include <iostream>
void handle_batch(std::vector<size_t> params) {
std::mutex s_mx;
if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}
// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;
}
}
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace process = boost::process;
using boost::system::error_code;
using boost::system::system_error;
using net::ip::tcp;
using stream = ssl::stream<tcp::socket>;
auto ssl_context() {
ssl::context ctx{ssl::context::sslv23};
ctx.set_default_verify_paths();
ctx.set_verify_mode(ssl::verify_peer);
return ctx;
}
void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
net::connect(s.lowest_layer(), eps);
s.lowest_layer().set_option(tcp::no_delay(true));
if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
}
s.handshake(stream::handshake_type::client);
}
auto get_request(std::string const& host, std::string const& path) {
using namespace http;
request<string_body> req;
req.version(11);
req.method(verb::get);
req.target("https://" + host + path);
req.set(field::user_agent, "test");
req.set(field::host, host);
std::cerr << req << std::endl;
return req;
}
int main() {
net::io_context io; // main thread does all io
net::thread_pool pool(6); // worker threads
// outside for lifetime
http::response_parser<http::buffer_body> response_reader;
beast::flat_buffer lookahead; // for the response_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);
{ // synchronously write request
std::string host = "www.mathsisfun.com";
connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
http::read_header(s, lookahead, response_reader);
//std::cerr << "Headers: " << response_reader.get().base() << std::endl;
}
// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);
std::function<void(error_code, size_t)> receive_zip;
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) {
auto& res = response_reader.get();
auto& body = res.body();
if (body.data) {
auto n = sizeof(buf) - body.size;
net::write(pipe_to_zcat, net::buffer(buf, n));
}
bool done = ec && !(ec == http::error::need_buffer);
done += response_reader.is_done();
if (done) {
std::cerr << "receive_zip: " << ec.message() << std::endl;
pipe_to_zcat.close();
} else {
body.data = buf.data();
body.size = buf.size();
http::async_read(s, lookahead, response_reader, receive_zip);
}
};
// kick off receive loop
receive_zip(error_code{}, 0);
process::async_pipe zcat_output(io);
process::child zcat(
process::search_path("zcat"),
process::std_in < pipe_to_zcat,
process::std_out > zcat_output,
process::on_exit([](int exitcode, std::error_code ec) {
std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
}), io);
std::function<void(error_code, size_t)> receive_primes;
net::streambuf sb;
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
{
std::istream is(&sb);
size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
std::vector<size_t> batch(n);
std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant
post(pool, std::bind(handle_batch, std::move(batch)));
}
if (ec) {
std::cerr << "receive_primes: " << ec.message() << std::endl;
zcat_output.close();
} else {
net::async_read_until(zcat_output, sb, "\n", receive_primes);
}
};
// kick off handler loop as well:
receive_primes(error_code{}, 0);
io.run();
pool.join();
}
GET https://www.mathsisfun.com/includes/primes-to-100k.zip HTTP/1.1
User-Agent: test
Host: www.mathsisfun.com
receive_zip: Success
Child process exited with 0 (Success)
receive_primes: End of file
Thread #11 Batch n:95 Range [599..1237] Sum:86587
Thread #58 Batch n:170 Range [1249..2549] Sum:320714
Thread #34 Batch n:170 Range [2551..3919] Sum:549880
Thread #54 Batch n:170 Range [3923..5407] Sum:790922
Thread #30 Batch n:170 Range [5413..6863] Sum:1040712
Thread #60 Batch n:108 Range [2..593] Sum:28697
Thread #58 Batch n:170 Range [8429..9923] Sum:1560462
Thread #11 Batch n:170 Range [6869..8423] Sum:1298732
Thread #30 Batch n:146 Range [12703..14087] Sum:1956410
Thread #34 Batch n:147 Range [9929..11329] Sum:1563023
Thread #54 Batch n:146 Range [11351..12697] Sum:1758964
Thread #60 Batch n:146 Range [14107..15473] Sum:2164462
Thread #11 Batch n:146 Range [16943..18313] Sum:2576764
Thread #34 Batch n:146 Range [19861..21313] Sum:3003048
Thread #30 Batch n:146 Range [18329..19853] Sum:2790654
Thread #58 Batch n:146 Range [15493..16937] Sum:2365198
Thread #60 Batch n:146 Range [22721..24109] Sum:3422310
Thread #54 Batch n:146 Range [21317..22717] Sum:3212180
Thread #30 Batch n:146 Range [27179..28661] Sum:4081540
Thread #11 Batch n:146 Range [24113..25693] Sum:3640476
Thread #34 Batch n:146 Range [25703..27143] Sum:3859484
Thread #60 Batch n:146 Range [30223..31741] Sum:4525378
Thread #54 Batch n:146 Range [31751..33211] Sum:4746372
Thread #58 Batch n:146 Range [28663..30211] Sum:4297314
Thread #30 Batch n:146 Range [33223..34693] Sum:4958972
Thread #34 Batch n:146 Range [36307..37799] Sum:5408028
Thread #11 Batch n:146 Range [34703..36299] Sum:5184000
Thread #54 Batch n:146 Range [39371..40973] Sum:5865356
Thread #60 Batch n:146 Range [37811..39367] Sum:5637612
Thread #58 Batch n:146 Range [40993..42433] Sum:6091022
Thread #34 Batch n:146 Range [44029..45613] Sum:6541984
Thread #54 Batch n:146 Range [47287..48817] Sum:7013764
Thread #30 Batch n:146 Range [42437..44027] Sum:6308156
Thread #11 Batch n:146 Range [45631..47279] Sum:6780582
Thread #58 Batch n:146 Range [50341..51913] Sum:7470486
Thread #34 Batch n:146 Range [51929..53569] Sum:7701048
Thread #60 Batch n:146 Range [48821..50333] Sum:7239008
Thread #54 Batch n:146 Range [53591..55147] Sum:7934798
Thread #11 Batch n:146 Range [56713..58211] Sum:8388956
Thread #58 Batch n:146 Range [58217..59771] Sum:8617316
Thread #30 Batch n:146 Range [55163..56711] Sum:8169020
Thread #60 Batch n:146 Range [61519..63197] Sum:9100594
Thread #34 Batch n:146 Range [59779..61511] Sum:8856806
Thread #54 Batch n:146 Range [63199..64849] Sum:9339328
Thread #11 Batch n:146 Range [64853..66457] Sum:9580694
Thread #58 Batch n:146 Range [66463..67979] Sum:9816826
Thread #30 Batch n:146 Range [67987..69779] Sum:10057662
Thread #54 Batch n:146 Range [72931..74573] Sum:10770902
Thread #34 Batch n:146 Range [71347..72923] Sum:10529702
Thread #60 Batch n:146 Range [69809..71341] Sum:10304156
Thread #11 Batch n:146 Range [74587..76231] Sum:11008056
Thread #58 Batch n:146 Range [76243..77801] Sum:11251048
Thread #30 Batch n:146 Range [77813..79561] Sum:11491034
Thread #34 Batch n:146 Range [81119..82729] Sum:11963076
Thread #60 Batch n:146 Range [82757..84449] Sum:12207776
Thread #58 Batch n:146 Range [86183..87767] Sum:12700772
Thread #54 Batch n:146 Range [79579..81101] Sum:11732042
Thread #11 Batch n:146 Range [84457..86179] Sum:12455242
Thread #30 Batch n:146 Range [87793..89527] Sum:12951322
Thread #34 Batch n:146 Range [89533..91153] Sum:13187046
Thread #54 Batch n:146 Range [94441..96013] Sum:13904802
Thread #30 Batch n:146 Range [97829..99487] Sum:14403556
Thread #58 Batch n:146 Range [92779..94439] Sum:13665032
Thread #60 Batch n:146 Range [91159..92767] Sum:13431876
Thread #11 Batch n:146 Range [96017..97813] Sum:14148718
Thread #34 Batch n:46 Range [99497..99991] Sum:4588078
关于c++11 - 如何使用仅连接一次的多线程从 Internet 读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61787334/
我仅在 WIN7 PC 上收到此通知,仅使用 IE。 Firefox 总是很好,旧版 Windows 上的 IE 似乎也不错。这让我大吃一惊,我不知道为什么 IE 认为 SSL 证书有问题。有没有人以
概述 对于我产品的新版本 v1.9.0,我创建了一个新的 MSI 安装程序。该应用程序的先前版本是 v1.7.0。 卸载旧版本然后安装新版本工作正常。 但是当我尝试使用 v1.9.0 安装程序更新旧版
该网站有一个全高图像启动。更多内容位于首屏下方,图像底部有一个“滚动”元素,以提示用户发现其余内容。单击后,我成功地使网站向下滚动 300 像素。然而,我想顺利地做到这一点。这是我当前的代码: w
var i = 0; function Myfunc() { var newdiv = document.createElement('div'); var el = document
这纯粹是为了学习目的;我知道 CSS 将是这种情况下的首选方法。 我知道在 JavaScript 中,您可以使用内联事件处理将鼠标悬停在图像上,如下所示: 我知道您可以在您的站点中安装 jQuery
我只想从curl请求中获取 header curl -I www.google.com 一切都很棒。现在我想这样做,但也传递发布数据: curl -I -d'test=test' www.google
以下代码旨在更改一个字段的颜色: Untitled Document var bkColor =
我正在使用 grep 递归搜索目录,并使用以下参数希望只返回第一个匹配项。不幸的是,它返回了不止一个——事实上,我上次查看时返回了两个。似乎我有太多的争论,尤其是没有得到想要的结果。 :-/ # gr
我只想搜索当前目录中的所有文件。我试过这个 grep foo * 但我收到此错误 grep: bar: Is a directory 我也尝试过这个 grep -r foo 但这也在搜索子目录。 最佳
我正在构建一个销售点应用程序,我想打印一张收据。问题是我使用的打印机无法打印纯文本的任何图形,我在 javafx 中只能找到使用 Print API 打印节点或使用像 jasper 这样都包含图形的报
是否有任何操作系统在完全加载时仅提供用于控制台应用程序执行的 java 环境?理想情况下,它会在加载时自动启动程序 最佳答案 这是一个名称为:JavaOS 的东西 从我的角度来看,更好的方法是安装一个
在工作中,我们有一个每晚执行 mysql 数据转储的脚本。对于开发,我们通常需要使用来自最近转储的数据。一段时间以来,我们一直每天都进行数据库还原,但现在我们已经到了每天还原花费近一个小时的地步。有没
我的移动模式菜单有问题。 onClick 它淡出。我想保留此设置,但我不希望它在单击下拉部分时淡出。这是链接:http://jsfiddle.net/zLLzrs6b/3/感谢您的帮助! html:
经过大量研究和反复试验,我谦虚地向各位 CSS 专家寻求帮助。这就是我需要的: 我有两张图片:titlelogo 和 newlogo。 在全屏模式下,newlogo 需要在左边,titlelogo 在
这个问题在这里已经有了答案: Exclusive CSS selector (3 个答案) 关闭 3 年前。 我的文档结构如下: ... ... something something someth
我有一个具有以下要求的表: 所有列的宽度必须可变 所有列的宽度不得超过必要的宽度 所有单元格必须保留空白(white-space:pre/pre-wrap) 当(且仅当)超过最大定义宽度 (1000p
我正在寻找一个正则表达式来仅匹配具有特殊 字符且大小为4+ 的数字 字符串。我对此处发布的问题做了一些评论: 测试网站: http://regexlib.com/RETester.aspx 1- re
我正在为我的元素开发一个纯 CSS 灯箱解决方案。我用谷歌搜索了它,但到目前为止只找到了部分解决方案。 我正在寻找这些功能: 显示任意宽任意高的内容(无固定高/宽) 垂直居中和水平居中 如果内容宽度和
出于各种原因,我目前正在尝试使用 HTML/CSS 创建网格布局(我知道 Bootstrap 等,但在这种情况下没有选择,而且我无法添加标记元素)。 我有以下代码(容器 div,每次都有一个带有 ul
有没有办法使用String.format()格式化 double 以仅获取小数? System.out.println(String.format("%.2f", 1.23456d)); 正如预期的那
我是一名优秀的程序员,十分优秀!