gpt4 book ai didi

c++ - Boost asio,单个 TCP 服务器,多个客户端

转载 作者:可可西里 更新时间:2023-11-01 02:36:53 36 4
gpt4 key购买 nike

我正在创建一个将使用 boost asio 的 TCP 服务器,它将接受来自许多客户端的连接、接收数据并发送确认。问题是我希望能够接受所有客户,但我一次只想与一个客户合作。我希望所有其他事务都保持在队列中。

例子:

  1. Client1 连接
  2. Client2 连接
  3. Client1发送数据并请求回复
  4. Client2发送数据并请求回复
  5. Client2的请求被放入队列
  6. Client1的数据被读取,服务器回复,事务结束
  7. Client2的请求从队列中取出,服务器读取数据,回复事务结束。

所以这是介于异步服务器和阻塞服务器之间的东西。我只想一次做一件事,但同时我希望能够将所有客户端套接字及其需求存储在队列中。

我能够使用我需要的所有功能创建服务器-客户端通信,但只能在单线程上进行。一旦客户端断开连接,服务器也会终止。我真的不知道如何开始实现我上面提到的内容。每次连接被接受时我应该打开新线程吗?我应该使用 async_accept 还是阻塞接受?

我读过 boost::asio 聊天示例,其中许多客户端连接到单个服务器,但这里没有我需要的排队机制。

我知道这篇文章可能有点令人困惑,但 TCP 服务器对我来说是新的,所以我对术语不够熟悉。也没有要发布的源代码,因为我只是寻求有关该项目概念的帮助。

最佳答案

继续接受。

你没有显示任何代码,但它通常看起来像

void do_accept() {
acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
std::cout << "async_accept -> " << ec.message() << "\n";
if (!ec) {
std::make_shared<Connection>(std::move(socket_))->start();
do_accept(); // THIS LINE
}
});
}

如果您不包含标记为 //THIS LINE 的行,您确实不会接受超过 1 个连接。

如果这没有帮助,请包含一些我们可以使用的代码。

为了好玩,一个演示

对于非网络部分,这仅使用标准库功能。

网络监听器

网络部分如前所述:

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <istream>

using namespace std::chrono_literals;
using Clock = std::chrono::high_resolution_clock;

namespace Shared {
using PostRequest = std::function<void(std::istream& is)>;
}

namespace Network {

namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;

using Shared::PostRequest;

struct Connection : std::enable_shared_from_this<Connection> {
Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {}

void process() {
auto self = shared_from_this();
ba::async_read(_s, _request, [this,self](error_code ec, size_t) {
if (!ec || ec == ba::error::eof) {
std::istream reader(&_request);
_poster(reader);
}
});
}

private:
tcp::socket _s;
ba::streambuf _request;
PostRequest _poster;
};

struct Server {

Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {}

void run_for(Clock::duration d = 30s) {
_stop.expires_from_now(d);
_stop.async_wait([this](error_code ec) { if (!ec) _svc.post([this] { _a.close(); }); });

_a.listen();

do_accept();

_svc.run();
}
private:
void do_accept() {
_a.async_accept(_s, [this](error_code ec) {
if (!ec) {
std::make_shared<Connection>(std::move(_s), _poster)->process();
do_accept();
}
});
}

unsigned short _port;
PostRequest _poster;

ba::io_service _svc;
ba::high_resolution_timer _stop { _svc };
tcp::acceptor _a { _svc, tcp::endpoint {{}, _port } };
tcp::socket _s { _svc };
};
}

工作服务部分的唯一“连接”是在构造时传递给服务器的 PostRequest 处理程序:

Network::Server server(6767, handler);

我还选择了异步操作,因此我们可以有一个计时器来停止服务,即使我们不使用任何线程也是如此:

server.run_for(3s); // this blocks

工作部分

这是完全独立的,将使用线程。首先,让我们定义一个Request,和一个线程安全的Queue:

namespace Service {
struct Request {
std::vector<char> data; // or whatever you read from the sockets...
};

Request parse_request(std::istream& is) {
Request result;
result.data.assign(std::istream_iterator<char>(is), {});
return result;
}

struct Queue {
Queue(size_t max = 50) : _max(max) {}

void enqueue(Request req) {
std::unique_lock<std::mutex> lk(mx);
cv.wait(lk, [this] { return _queue.size() < _max; });
_queue.push_back(std::move(req));

cv.notify_one();
}

Request dequeue(Clock::time_point deadline) {
Request req;

{
std::unique_lock<std::mutex> lk(mx);
_peak = std::max(_peak, _queue.size());
if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) {
req = std::move(_queue.front());
_queue.pop_front();
cv.notify_one();
} else {
throw std::range_error("dequeue deadline");
}
}

return req;
}

size_t peak_depth() const {
std::lock_guard<std::mutex> lk(mx);
return _peak;
}

private:
mutable std::mutex mx;
mutable std::condition_variable cv;

size_t _max = 50;
size_t _peak = 0;
std::deque<Request> _queue;
};

这没什么特别的,而且实际上还没有使用线程。让我们创建一个接受队列引用的辅助函数(如果需要,可以启动 1 个以上的辅助函数):

    void worker(std::string name, Queue& queue, Clock::duration d = 30s) {
auto const deadline = Clock::now() + d;

while(true) try {
auto r = queue.dequeue(deadline);
(std::cout << "Worker " << name << " handling request '").write(r.data.data(), r.data.size()) << "'\n";
}
catch(std::exception const& e) {
std::cout << "Worker " << name << " got " << e.what() << "\n";
break;
}
}
}

主要驱动程序

这里是队列被实例化的地方,网络服务器和一些工作线程都被启动了:

int main() {
Service::Queue queue;

auto handler = [&](std::istream& is) {
queue.enqueue(Service::parse_request(is));
};

Network::Server server(6767, handler);

std::vector<std::thread> pool;
pool.emplace_back([&queue] { Service::worker("one", queue, 6s); });
pool.emplace_back([&queue] { Service::worker("two", queue, 6s); });

server.run_for(3s); // this blocks

for (auto& thread : pool)
if (thread.joinable())
thread.join();

std::cout << "Maximum queue depth was " << queue.peak_depth() << "\n";
}

现场演示

See It Live On Coliru

测试负载如下所示:

for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world"
do
netcat 127.0.0.1 6767 <<< "$a" || echo "not sent: '$a'"&
done
wait

它打印出类似的东西:

Worker one handling request 'brownfox'
Worker one handling request 'thepangram'
Worker one handling request 'jumpedover'
Worker two handling request 'Worker helloworldone handling request 'byeworld'
Worker one handling request 'thequick'
'
Worker one got dequeue deadline
Worker two got dequeue deadline
Maximum queue depth was 6

关于c++ - Boost asio,单个 TCP 服务器,多个客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46935025/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com