gpt4 book ai didi

c++ - 使用 boost::asio::async_write 和 boost::process::async_pipe 多次写入子进程的标准输入

转载 作者:行者123 更新时间:2023-12-05 06:07:51 29 4
gpt4 key购买 nike

注意:我在本文中交替使用“客户端”和“子”这两个词来指代从“服务器”启动的进程。

我正在使用 boost::process::async_pipe 编写我使用 boost::process::child 启动的进程的 STDIN。假设我的服务器程序看起来像这样:

(这不是一个有效的服务器演示)

服务器.cpp

int main()
{
using namespace std::chrono_literals;

boost::process::async_pipe writePipe;
boost::process::child child { "client", boost::process::std_in < _writePipe };

std::vector<char> buffer;
buffer.resize(1024u * 1024u);

while (working)
{
auto length = 0u;

/*
do a bunch of work that takes a long time
and also determines `length`, in this case I'm
adding a sleep to simulate the time between
calls to `async_write()`
*/

std::this_thread::sleep_for(5s);

boost::asio::async_write(writePipe,
boost::asio::buffer(buffer.data(), length),
[&writePipe](boost::system::error_code, size_t)
{
// writePipe.close();
});


/*
I know that this code as-is would have issues with
synchronizing `buffer`, but for the purpose of this
question please ignore that
*/
}
}

基本上我有一个内存缓冲区,我在其中做一些工作,并且我经常想向子进程发送一些二进制数据。我的子进程看起来像这样:

child .cpp

#include <iostream>
#include <string_view>

void print_hex(const char* p, std::size_t size)
{
std::string_view input(p, size);

static const char* const lut = "0123456789ABCDEF";
size_t len = input.length();

std::string output;
output.reserve(2 * len);
for (size_t i = 0; i < len; ++i)
{
const unsigned char c = static_cast<const unsigned char>(input[i]);
// output.append("0x");
output.push_back(lut[c >> 4]);
output.push_back(lut[c & 15]);
output.append(" ");
}

if (output.size() > 0) output.pop_back();
std::cout << "HEX (" << size<< "): " << output << std::endl;
}

int main()
{
std::vector<char> buffer;
buffer.resize(BUFFER_SIZE);

bool done = false;
while (!done)
{
auto rdbuf = std::cin.rdbuf();
while (auto count = rdbuf->sgetn(buffer.data(), BUFFER_SIZE))
{
print_hex(buffer.data(), count);
}
}
}

随着 writePipe.close() 被注释掉,我注意到我的子程序在服务器进程终止之前从未获得任何数据。如果我取消注释关闭管道的调用,那么我只能处理第一次调用 boost::asio::async_write() 时的数据。

编辑:

不幸的是@sehe 的原始答案没有解决问题。我稍微更新了服务器代码以更好地说明问题(并且我解决了保留/调整大小问题)。

然而,当再次环顾四周时,我读到了一些关于 sgetn() 的语言,它说:

The default definition of xsgetn in streambuf retrieves charactersfrom the controlled input sequence and stores them in the arraypointed by s, until either n characters have been extracted or the endof the sequence is reached.

因此,我重构了我的客户端,首先询问流有多少字节可用,然后分块读取流。这是我的第一次尝试:

    bool done = false;
while (!done)
{
auto rdbuf = std::cin.rdbuf();
const auto available = rdbuf->in_avail();
if (available == 0)
{
continue;
}

auto bytesToRead = std::min(BUFFER_SIZE, static_cast<std::uint32_t>(available));
auto bytesRead = rdbuf->sgetn(buffer.data(), bytesToRead);
print_hex(buffer.data(), bytesRead);

while (bytesRead < available)
{
bytesToRead = std::min(BUFFER_SIZE, static_cast<std::uint32_t>(available - bytesRead));
bytesRead += rdbuf->sgetn(buffer.data(), bytesToRead);
print_hex(buffer.data(), bytesRead);
}
}

但是,即使在添加了 std::cin.sync_with_stdio(false); (来自答案 Why does in_avail() output zero even if the stream has some char? )之后,对 rdbuf->in_avail() 的调用总是返回0。即使我在服务器外部和命令行上尝试,如:ls |客户端

我希望我的客户端程序读取传入的数据,而不必 (1) 关闭服务器进程或 (2) 关闭管道(除非我可以重新打开管道以执行后续 write ()。

谢谢!

最佳答案

哎呀。我花了很多时间调整服务器直到它工作。

原来有...客户端中的错误。

基本上,在你写 .reserve(...) 的地方你应该放 .resize():

buffer.resize(BUFFER_SIZE);

A similar bug also possibly exists in the server, even you don't show the full code, the reserve there seems odd.

现在,我不确定服务器部分实际上需要多少更改,但让我把它放在我最终成功测试它的地方。

完全异步

Live On Coliru

#include <boost/asio/io_service.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <random>

static std::mt19937 prng{ std::random_device{}() };

static std::uniform_int_distribution<size_t> lendist(10, 32);
static std::uniform_int_distribution<char> a_z('a', 'z');

static size_t gen_length() { return lendist(prng); }
static char gen_alpha() { return a_z(prng); }

namespace bp = boost::process;
namespace ba = boost::asio;
using namespace std::chrono_literals;

int main() {
ba::io_service io; // one thread
//ba::io_service::strand strand(io);
auto& strand = io;
bp::async_pipe writePipe(io);
//bp::child child(bp::exe("/home/sehe/Projects/stackoverflow/child.exe"),
bp::child child(bp::exe("./child.exe"),
io,
bp::std_in < writePipe,
bp::std_out > "child.log");

auto shutdown_sequence = [&] {
std::this_thread::sleep_for(1s);
std::clog << "Closing" << std::endl;
post(strand, [&] { writePipe.close(); });
};

std::function<void()> work_loop;

work_loop = [&, buffer = std::vector<char>(1 << 20)]() mutable {
size_t length = gen_length();
std::generate_n(buffer.data(), length, gen_alpha);

async_write(writePipe, bp::buffer(buffer.data(), length),
[&strand, &shutdown_sequence, &work_loop](boost::system::error_code ec, size_t tx) {
std::clog << "Wrote " << tx << " bytes (" << ec.message() << ")" << std::endl;
if (ec || (tx == 29)) { // magic length indicates "work done"
post(strand, shutdown_sequence);
} else {
post(strand, work_loop);
}
});
};

// start IO pump
post(strand, work_loop);
io.run();

std::clog << "Bye" << std::endl;
}

运行时,打印类似的东西

./main.exe
Wrote 13 bytes (Success)
Wrote 11 bytes (Success)
Wrote 26 bytes (Success)
Wrote 32 bytes (Success)
Wrote 17 bytes (Success)
Wrote 24 bytes (Success)
Wrote 28 bytes (Success)
Wrote 29 bytes (Success)
Closing
Bye

同时还写了一个child.log:

HEX (32): 71 74 79 6E 77 74 66 63 74 72 66 6D 6D 69 6E 68 73 61 6F 75 68 77 69 77 6B 65 77 6F 76 6D 76 6E
HEX (32): 68 67 72 79 77 7A 74 68 6A 77 65 63 78 64 66 76 6A 61 64 7A 72 6C 74 6E 63 74 6B 71 64 73 7A 70
HEX (32): 70 73 77 79 75 61 70 7A 6D 73 6F 77 68 71 6A 6B 62 6F 77 63 70 63 6D 74 79 70 70 67 6B 64 75 63
HEX (32): 78 6A 79 65 78 68 74 69 75 7A 67 73 67 63 6D 69 73 65 64 63 67 6C 72 75 72 66 76 79 74 75 61 6F
HEX (32): 76 69 75 6D 73 76 6E 79 72 6C 77 6D 69 6F 74 71 6D 76 77 6F 6E 70 73 77 6C 6B 75 68 76 74 71 74
HEX (20): 79 71 77 77 61 75 71 6A 73 68 67 71 72 7A 77 6C 66 67 74 67

关于c++ - 使用 boost::asio::async_write 和 boost::process::async_pipe 多次写入子进程的标准输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65313352/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com