libco 协程,被分类到非对称协程。分类的方法是:
- 对称协程(典型的如 golang 的 goroutine),一个协程 yield 后,执行机会不会直接返还到调用者,而是调度器来决定唤起哪个协程。
- 非对称协程(例如 libco ),一个协程 yield 后,会返还到调用者,继续执行调用者的函数。
非对称在于程序控制流转移到被调协程时使用的是 call/resume 操作,而当被调协程让出 CPU 时使用的却是 return/yield 操作。此外,协程间的地位也不对等,caller 与 callee 关系是确定的,不可更改的,非对称协程只能返回最初调用它的协程。
在 libco 的实现中,每个线程会有一个 stack 来维护当前的协程列表,调用 co_resume
则会压入一个协程,调用 co_yield
则会弹出一个协程(实际的实现是栈顶指针的加减)。
所以,在实际使用过程中,如果涉及到复杂场景,需要用到 co_yield
来自己调度 libco
的协程。那么怎么来实现呢,有没有什么套路。我们可以参考这个自带例子来学习一下。
因为需要自己调度协程,所以一般要把协程指针和对应任务信息封装到一个结构体中,同时使用一个 stack/queue
保存所有的协程(因为一个协程 yield 之后就找不到了,没办法自动继续执行)。
struct task_t // 一个任务结构体
{
stCoRoutine_t *co; // 对应的协程指针
int fd; // 协程负责任务的参数
};
static stack<task_t*> g_readwrite; // 开一个全局变量来保存所有的任务
为了方便理解,可以先忽略例子中的多进程部分代码。代码先创建了若干个 readwrite_routine
协程
for(int i=0;i<cnt;i++)
{
task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
task->fd = -1; // 特殊值
co_create( &(task->co),NULL,readwrite_routine,task );
co_resume( task->co ); // 压到 stack 中开始执行,跳转到下面的函数
}
static void *readwrite_routine( void *arg )
{
co_enable_hook_sys(); // hook read/write 等io调用走协程实现
task_t *co = (task_t*)arg;
char buf[ 1024 * 16 ];
for(;;)
{
if( -1 == co->fd ) // 第一次进来是 -1 特殊值,将 task 保存到 g_readwrite
{
g_readwrite.push( co );
co_yield_ct(); // 让出 cpu 到调用者,也就是主线程,继续创建协程。
} // 然后会进入到 accept_routine continue;
int fd = co->fd; // 从 accept_routine 切换回来,这个时候 fd 有值了
co->fd = -1; // 标记为特殊值,下次循环会 yield 出去
for(;;)
{ // 任务逻辑
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = (POLLIN|POLLERR|POLLHUP);
co_poll( co_get_epoll_ct(),&pf,1,1000);
int ret = read( fd,buf,sizeof(buf) );
if( ret > 0 )
{
ret = write( fd,buf,ret );
}
if( ret > 0 || ( -1 == ret && EAGAIN == errno ) )
{
continue;
}
close( fd );
break;
}
}
return 0;
}
static void *accept_routine( void * )
{
co_enable_hook_sys();
for(;;)
{
if( g_readwrite.empty() ) // g_readwrite 为空时,等待1s
{
struct pollfd pf = { 0 };
pf.fd = -1;
poll( &pf,1,1000);
continue;
}
struct sockaddr_in addr; //maybe sockaddr_un;
memset( &addr,0,sizeof(addr) );
socklen_t len = sizeof(addr);
int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len);
if( fd < 0 )
{
struct pollfd pf = { 0 };
pf.fd = g_listen_fd;
pf.events = (POLLIN|POLLERR|POLLHUP);
co_poll( co_get_epoll_ct(),&pf,1,1000 ); // co_accept/co_poll注册回调,在有数据的时候swap回来,然后让出 cpu,执行 co_eventloop
continue;
}
if( g_readwrite.empty() ) // double check
{
close( fd );
continue;
}
SetNonBlock( fd );
task_t *co = g_readwrite.top(); // 取一个协程,分配这个 accept 的 fd
co->fd = fd;
g_readwrite.pop();
co_resume( co->co ); // 切换到改协程,继续执行 readwrite_routine
}
return 0;
}
总结一下
使用 libco 自己调度协程的实现模式有:
- 用一个全局队列保存所有的任务执行协程;
- 使用 task 结构体保存协程指针和任务参数;
- 任务执行协程初始化后压到协程队列,让出 cpu 给调用者;
- 任务分发协程在没有可用协程时(协程队列为空),等待任务协程空闲。有任务时,更新任务参数后唤醒执行协程。