整洁的异步IO代码

虽然异步IO(带有select / poll / epoll / kqueue等的非阻塞描述符)不是Web上记录最多的东西,但是有一些很好的例子。

但是,所有这些示例在确定了调用返回的句柄后,只有一个’ do_some_io(fd) ‘存根。 它们并没有真正解释如何在这种方法中最好地接近实际的异步IO。

阻止IO非常整洁,直接读取代码。 另一方面,非阻塞,异步IO是毛茸茸的,凌乱的。

有什么办法? 什么是健壮和可读的?

 void do_some_io(int fd) { switch(state) { case STEP1: ... async calls if(io_would_block) return; state = STEP2; case STEP2: ... more async calls if(io_would_block) return; state = STEP3; case STEP3: ... } } 

或者(ab)使用GCC的计算得到:

 #define concatentate(x,y) x##y #define async_read_xx(var,bytes,line) \ concatentate(jmp,line): \ if(!do_async_read(bytes,&var)) { \ schedule(EPOLLIN); \ jmp_read = &&concatentate(jmp,line); \ return; \ } // macros for making async code read like sync code #define async_read(var,bytes) \ async_read_xx(var,bytes,__LINE__) #define async_resume() \ if(jmp_read) { \ void* target = jmp_read; \ jmp_read = NULL; \ goto *target; \ } void do_some_io() { async_resume(); async_read(something,sizeof(something)); async_read(something_else,sizeof(something_else)); } 

或者可能是C ++exception和状态机,因此工作器函数可以触发中止/恢复位,或者可能是表驱动的状态机?

它不是如何使它工作,它如何让它可追溯到我追逐!

我建议看看: http : //www.kegel.com/c10k.html ,第二,看看现有的库,比如libevent,Boost.Asio已经完成了这项工作,看看它们是如何工作的。

关键是每种类型的系统调用方法可能不同:

  • 选择是简单的反应堆
  • epoll具有边缘或水平触发的接口,需要不同的方法
  • iocp是proactor需要其他方法

建议:使用良好的现有库,如Boost.Asio for C ++或libevent for C.

编辑:这是ASIO如何处理这个问题

 class connection { boost::asio:ip::tcp::socket socket_; public: void run() { // for variable length chunks async_read_until(socket_,resizable_buffer,'\n', boost::bind(&run::on_line_recieved,this,errorplacehplder); // or constant length chunks async_read(socket_,buffer(some_buf,buf_size), boost::bind(&run::on_line_recieved,this,errorplacehplder); } void on_line_recieved(error e) { // handle it run(); } }; 

因为ASIO作为proactor工作,它会在操作完成时通知您并在内部处理EWOULDBLOCK。

如果你称之为reactor,你可以模拟这种行为:

  class conn { // Application logic void run() { read_chunk(&conn::on_chunk_read,size); } void on_chunk_read() { /* do something;*/ } // Proactor wrappers void read_chunk(void (conn::*callback),int size, int start_point=0) { read(socket,buffer+start,size) if( complete ) (this->*callback() else { this -> tmp_size-=size-read; this -> tmp_start=start+read; this -> tmp_callback=callback your_event_library_register_op_on_readable(callback,socket,this); } } void callback() { read_chunk(tmp_callback,tmp_size,tmp_start); } } 

这样的事情。

状态机是一种很好的方法。 事先有点复杂性,这将为您节省未来的麻烦,未来真的很快就会开始。 😉

另一种方法是使用线程并在每个线程中的单个fd上执行阻塞I / O. 这里的权衡是你使I / O变得简单,但可能会引入同步的复杂性。

存在很好的设计模式“协程”来解决这个问题。

它是两个世界中最好的:整洁的代码,完全像同步io流和没有上下文切换的出色性能,如async io给出的。 Coroutine看起来像一个异步同步线程,带有单指令指针。 但是许多协同程序可以在一个OS线程内运行(所谓的“协作式多任务处理”)。

协程代码示例:

 void do_some_io() { blocking_read(something,sizeof(something)); blocking_read(something_else,sizeof(something_else)); blocking_write(something,sizeof(something)); } 

看起来像同步代码,但实际上控制流使用另一种方式,如下所示:

 void do_some_io() { // return control to network io scheduler, to handle another coroutine blocking_read(something,sizeof(something)); // when "something" is read, scheduler fill given buffer and resume this coroutine // return control to network io scheduler, to handle another coroutine CoroSleep( 1000 ); // scheduler create async timer and when it fires, scheduler pass control to this coroutine ... // and so on 

因此,单线程调度程序使用用户定义的代码控制许多协同程序,并对io进行整齐的同步调用。

C ++协同程序实现示例是“boost.coroutine”(实际上不是boost的一部分) http://www.crystalclearsoftware.com/soc/coroutine/这个库完全实现了协程机制,可以使用boost.asio作为调度程序和async io层。

您需要有一个提供async_schedule(),async_foreach(),async_tick()等的主循环。这些函数依次将条目放入一个全局的方法列表中,这些方法将在下次调用async_tick()时运行。 然后,您可以编写更整洁的代码,并且不包含任何switch语句。

你可以写:

 async_schedule(callback, arg, timeout); 

要么:

 async_wait(condition, callback, arg, timeout); 

然后您的条件甚至可以在另一个线程中设置(前提是您在访问该变量时负责线程安全)。

我已经在C中为我的嵌入式项目实现了异步框架,因为我想要进行非抢占式多任务处理,并且异步非常适合在主循环的每次迭代中执行一些工作来完成许多任务。

代码在这里: https : //github.com/mkschreder/fortmax-blocks/blob/master/common/kernel/async.c

您希望将“io”与处理分离,此时您阅读的代码将变得非常易读。 基本上你有:

 int read_io_event(...) { /* triggers when we get a read event from epoll/poll/whatever */ /* read data from "fd" into a vstr/buffer/whatever */ if (/* read failed */) /* return failure code to event callback */ ; if (/* "message" received */) return process_io_event(); if (/* we've read "too much" */) /* return failure code to event callback */ ; return /* keep going code for event callback */ ; } int process_io_event(...) { /* this is where you process the HTTP request/whatever */ } 

…然后真正的代码处于进程事件中,即使你有多个请求响应它也很可读,你只需在设置状态或其他后“返回read_io_event()”。