- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我想知道在这种情况下最好(最干净、最难搞砸)的清理方法是什么。
void MyClass::do_stuff(boost::asio::yield_context context) {
while (running_) {
uint32_t data = async_buffer->Read(context);
// do other stuff
}
}
Read 是一个异步等待的调用,直到有数据要读取,然后返回该数据。如果我想删除 MyClass 的这个实例,我怎样才能确保我这样做是正确的?假设这里的异步等待是通过 deadline_timer 的 async_wait 执行的。如果我取消事件,在我知道事情处于良好状态之前,我仍然必须等待线程完成执行“其他东西”(我不能加入线程,因为它是属于 io 服务的线程也可能正在处理其他工作)。我可以这样做:
MyClass::~MyClass() {
running_ = false;
read_event->CancelEvent(); // some way to cancel the deadline_timer the Read is waiting on
boost::mutex::scoped_lock lock(finished_mutex_);
if (!finished_) {
cond_.wait(lock);
}
// any other cleanup
}
void MyClass::do_stuff(boost::asio::yield_context context) {
while (running_) {
uint32_t data = async_buffer->Read(context);
// do other stuff
}
boost::mutex::scoped_lock lock(finished_mutex_);
finished_ = true;
cond.notify();
}
但我希望让这些堆叠的协程尽可能易于使用,人们并不容易认识到这种情况的存在以及需要做些什么来确保事情得到妥善清理。有没有更好的办法?我在这里尝试做的事情在更基本的层面上是错误的吗?
此外,对于事件(我所拥有的与 Tanner 的回答基本相同 here )我需要以一种我必须保持一些额外状态的方式取消它(真正的取消与正常的取消用于触发事件)——如果有多个逻辑片段在等待同一个事件,这将是不合适的。很想知道是否有更好的方法来模拟要与协程挂起/恢复一起使用的异步事件。
谢谢。
编辑:谢谢@Sehe,拍摄了一个工作示例,我认为这说明了我的意思:
class AsyncBuffer {
public:
AsyncBuffer(boost::asio::io_service& io_service) :
write_event_(io_service) {
write_event_.expires_at(boost::posix_time::pos_infin);
}
void Write(uint32_t data) {
buffer_.push_back(data);
write_event_.cancel();
}
uint32_t Read(boost::asio::yield_context context) {
if (buffer_.empty()) {
write_event_.async_wait(context);
}
uint32_t data = buffer_.front();
buffer_.pop_front();
return data;
}
protected:
boost::asio::deadline_timer write_event_;
std::list<uint32_t> buffer_;
};
class MyClass {
public:
MyClass(boost::asio::io_service& io_service) :
running_(false), io_service_(io_service), buffer_(io_service) {
}
void Run(boost::asio::yield_context context) {
while (running_) {
boost::system::error_code ec;
uint32_t data = buffer_.Read(context[ec]);
// do something with data
}
}
void Write(uint32_t data) {
buffer_.Write(data);
}
void Start() {
running_ = true;
boost::asio::spawn(io_service_, boost::bind(&MyClass::Run, this, _1));
}
protected:
boost::atomic_bool running_;
boost::asio::io_service& io_service_;
AsyncBuffer buffer_;
};
所以在这里,假设缓冲区是空的,并且 MyClass::Run 当前在调用 Read 时挂起,所以有一个 deadline_timer.async_wait 正在等待事件触发以恢复该上下文。是时候销毁 MyClass 的这个实例了,那么我们如何确保它干净地完成。
最佳答案
一个更典型的方法是使用 boost::enable_shared_from_this
与 MyClass
,并运行绑定(bind)到共享指针的方法。
Boost Bind 支持绑定(bind)到 boost::shared_ptr<MyClass>
透明地。
这样,您可以自动让析构函数仅在最后一个用户消失时运行。
如果您创建 SSCCE,我很乐意对其进行更改,以表明我的意思。
更新
致SSCCEE:几点说明:
MyClass
的方式拨入 AsyncBuffer
成员函数直接不是线程安全的。实际上没有线程安全的方法来取消生产者线程[1] 之外的事件,因为生产者已经访问了 Write
的缓冲区。荷兰国际集团这可以使用 strand 来缓解(在当前设置中,我看不到 MyClass 可能是线程安全的)。或者,查看事件对象模式(Tanner 在 SO 上有一个很好的答案[2])。
为了简单起见,我在这里选择了 strand 方法,所以我们这样做:
void MyClass::Write(uint32_t data) {
strand_.post(boost::bind(&AsyncBuffer::Write, &buffer_, data));
}
你问
Also, for the event (what I have is basically the same as Tanner's answer here) I need to cancel it in a way that I'd have to keep some extra state (a true cancel vs. the normal cancel used to fire the event)
这种状态最自然的地方是 deadline_timer 的通常位置:它是 deadline。通过重置计时器来停止缓冲区:
void AsyncBuffer::Stop() { // not threadsafe!
write_event_.expires_from_now(boost::posix_time::seconds(-1));
}
这会立即取消计时器,但可以检测到,因为截止日期已过去。
这是一个简单的演示,其中包含一组 IO 服务线程,一个生成随机数的“生产者协程”和一个狙击 MyClass::Run
的“狙击线程” 2 秒后协程。主线程是狙击线程。
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/atomic.hpp>
#include <list>
#include <iostream>
// for refcounting:
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
namespace asio = boost::asio;
class AsyncBuffer {
friend class MyClass;
protected:
AsyncBuffer(boost::asio::io_service &io_service) : write_event_(io_service) {
write_event_.expires_at(boost::posix_time::pos_infin);
}
void Write(uint32_t data) {
buffer_.push_back(data);
write_event_.cancel();
}
uint32_t Read(boost::asio::yield_context context) {
if (buffer_.empty()) {
boost::system::error_code ec;
write_event_.async_wait(context[ec]);
if (ec != boost::asio::error::operation_aborted || write_event_.expires_from_now().is_negative())
{
if (context.ec_)
*context.ec_ = boost::asio::error::operation_aborted;
return 0;
}
}
uint32_t data = buffer_.front();
buffer_.pop_front();
return data;
}
void Stop() {
write_event_.expires_from_now(boost::posix_time::seconds(-1));
}
private:
boost::asio::deadline_timer write_event_;
std::list<uint32_t> buffer_;
};
class MyClass : public boost::enable_shared_from_this<MyClass> {
boost::atomic_bool stopped_;
public:
MyClass(boost::asio::io_service &io_service) : stopped_(false), buffer_(io_service), strand_(io_service) {}
void Run(boost::asio::yield_context context) {
while (!stopped_) {
boost::system::error_code ec;
uint32_t data = buffer_.Read(context[ec]);
if (ec == boost::asio::error::operation_aborted)
break;
// do something with data
std::cout << data << " " << std::flush;
}
std::cout << "EOF\n";
}
bool Write(uint32_t data) {
if (!stopped_) {
strand_.post(boost::bind(&AsyncBuffer::Write, &buffer_, data));
}
return !stopped_;
}
void Start() {
if (!stopped_) {
stopped_ = false;
boost::asio::spawn(strand_, boost::bind(&MyClass::Run, shared_from_this(), _1));
}
}
void Stop() {
stopped_ = true;
strand_.post(boost::bind(&AsyncBuffer::Stop, &buffer_));
}
~MyClass() {
std::cout << "MyClass destructed because no coroutines hold a reference to it anymore\n";
}
protected:
AsyncBuffer buffer_;
boost::asio::strand strand_;
};
int main()
{
boost::thread_group tg;
asio::io_service svc;
{
// Start the consumer:
auto instance = boost::make_shared<MyClass>(svc);
instance->Start();
// Sniper in 2 seconds :)
boost::thread([instance]{
boost::this_thread::sleep_for(boost::chrono::seconds(2));
instance->Stop();
}).detach();
// Start the producer:
auto producer_coro = [instance, &svc](asio::yield_context c) { // a bound function/function object in C++03
asio::deadline_timer tim(svc);
while (instance->Write(rand())) {
tim.expires_from_now(boost::posix_time::milliseconds(200));
tim.async_wait(c);
}
};
asio::spawn(svc, producer_coro);
// Start the service threads:
for(size_t i=0; i < boost::thread::hardware_concurrency(); ++i)
tg.create_thread(boost::bind(&asio::io_service::run, &svc));
}
// now `instance` is out of scope, it will selfdestruct after the snipe
// completed
boost::this_thread::sleep_for(boost::chrono::seconds(3)); // wait longer than the snipe
std::cout << "This is the main thread _after_ MyClass self-destructed correctly\n";
// cleanup service threads
tg.join_all();
}
[1] 逻辑线程,这可能是在不同线程上恢复的协程
关于c++ - 使用挂起的协程进行适当的清理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26739647/
这段代码在 Java 中的等价物是什么?我放了一部分,我对 I/O 部分感兴趣: int fd = open(FILE_NAME, O_WRONLY); int ret = 0; if (fd =
我正在尝试将维度为 d1,d2,d3 的张量 M[a1,a2,a3] reshape 为维度为 d2, d1*d3 的矩阵 M[a2,a1*a3]。我试过 M.reshape(d2,d1*d3) 但是
我是一名优秀的程序员,十分优秀!