协程库

本文最后更新于 2024年5月13日 晚上

一、协程

1.1 基本概念

  • 协程可以理解为一种用户态的轻量级线程,一个线程里跑多个协程。

  • 协程分为对称协程非对称协程,对称协程就是当协程切换的时候他可以切换到任意其他的协程,比如goroutine;而非对称协程只能切换到调用他的调度器。

  • C20以前,c/c++不直接支持协程语义,但有不少开源的协程库, 目前大概有三种实现协程的方式:

    • 利用glibcucontext组件:coroutine
    • 利用汇编代码来切换上下文:libco
    • 利用C语言的setjmplongjmplibmill
  • 当然在C20的新特性中在语言级别支持了协程

1.2 解决的问题

  • 在IO密集型的程序中,CPU等待IO结果往往是非常频繁的事情,如果按照我们常规的思维写处理IO的代码,如:accept --> read --> process --> write。这个流程当中三个地方将会面临CPU的阻塞等待问题(如果使用的是阻塞IO的话),而一台服务器是需要处理成千上万的连接请求的,所以这里的阻塞等待是万万不能接受的。
  • 最早的解决方案就是开新的线程来处理每一个连接请求,这样即使发生阻塞也是在各自的线程中发生阻塞,而不会影响服务器相应别的请求。但是这样带来的问题就是连接多起来之后需要申请很多的线程资源,许多线程发生阻塞之后也会给系统带来额外的负担。
  • 接着出现的方案是使用epoll在单个线程内同时监听多个连接,当监听到指定的信号之后再执行相应的处理逻辑。这样通过较少的线程就能处理大量的连接,此方案的性能也非常的优秀。但是这种基于事件的异步处理方案的处理流程是打散,我们不能按照顺序思维写整个处理流程。
  • 最后协程其实就是为了解决异步处理逻辑混乱的方案,可以用顺序思维流程的代码写出异步处理的代码。

二、coroutine实现

参考资料:

云风coroutine源码分析

ucontext-人人都可以实现的简单协程库

  • glibc中提供了ucontext库函数用于操作上下文,基于这组提供的函数可以实现上下文的切换,从而可以实现协程。

2.1 ucontext

  • 在类System V环境中,头文件<ucontext.h>中定义了一个核心结构体和四个函数用以支持用户级的线程切换。
  • 核心结构体是ucontext_t,它是保存上下文信息的核心数据结构,基本结构如下:
    • uc_link的类型是ucontext_t*,其存储的是当前上下文运行完成之后或者被挂起时,应退出到的位置;
    • uc_sigmask为该上下文中的阻塞信号集合;
    • uc_stack为该上下文中使用的栈;
    • uc_mcontext保存上下文的特定机器表示,包括调用线程的特定寄存器等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct ucontext {
struct ucontext *uc_link; // 该上下文执行完时要恢复的上下文
sigset_t uc_sigmask;
stack_t uc_stack; //使用的栈
mcontext_t uc_mcontext;
...
} ucontext_t;

typedef struct
{
void *ss_sp;
int ss_flags;
size_t ss_size;
} stack_t;
  • 然后是核心的四个函数:
    • getcontext:初始化ucp结构体,将当前的上下文保存到ucp中。
    • makecontext:修改通过getcontext取得的上下文ucp(这意味着**调用makecontext前必须先调用getcontext**)。然后给该上下文指定一个栈空间ucp->stack,设置后继的上下文ucp->uc_link。如果这里设置的要返回的上下文为NULL,则当前线程会直接退出。
    • setcontext:设置当前的上下文为ucp,这里的ucp应该通过getcontext或者makecontext取得,如果调用成功则不返回。其实很好理解这里的直接设置上下文操作,就是直接将执行流程跳转到ucp,这里也没有去保存当前上下文,所以当前的运行环境一定是直接丢失的,也就不会返回了。当然跳转的上下文中可能设置了后继上下文ucp->uc_link,如果当前运行完之后会返回到这里记录的后继上下文。
    • swapcontext:不同于前者直接进行跳转,swapcontext会将当前的上下文信息换出并存储到oucp,而我们可以将要跳转的上下文的后继ucp->uc_link提前设置成oucp(注意这里是写指针,oucp里面当前保存的上下文其实还不是真正的返回位置),然后在调用swapcontext时第一个参数就填oucp,那就将当前上下文写入到oucp中了,于是在ucp上下文挂起或者退出的时候自然能返回当前调用swapcontext的位置了。
1
2
3
4
int getcontext(ucontext_t *ucp); //将当前上下文保存到ucp
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...); //修改上下文入口函数
int setcontext(const ucontext_t *ucp); //切换到上下文ucp
int swapcontext(ucontext_t *oucp, ucontext_t *ucp); //保存当前上下文到oucp,切换到上下文ucp
  • 上下文的核心其实就是当前寄存器状态(这包含了pc, sp以及各种通用寄存器),以及栈空间(每个线程/协程都是需要有单独的栈空间的)。当利用ucontext创建新的上下文的时候是需要我们创建新的栈空间的,而栈的大小设置是件很困难的事情。

2.2 协程库实现

  • coroutine是云风在2012年利用ucontext实现的一个非常简单的非对称协程库,其只提供了如下的几个函数接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define COROUTINE_DEAD 0  //协程状态
#define COROUTINE_READY 1
#define COROUTINE_RUNNING 2
#define COROUTINE_SUSPEND 3

struct schedule; //协程调度器

typedef void (*coroutine_func)(struct schedule *, void *ud); //协程执行函数

struct schedule * coroutine_open(void); //创建协程调度器
void coroutine_close(struct schedule *); //关闭协程调度器

int coroutine_new(struct schedule *, coroutine_func, void *ud); //用协程调度器创建一个协程
void coroutine_resume(struct schedule *, int id); //恢复id号协程
int coroutine_status(struct schedule *, int id); //返回id号协程的状态
int coroutine_running(struct schedule *); //返回正在执行的协程id
void coroutine_yield(struct schedule *); //保存上下文后中断当前协程执行
  • 这是一个非对称的协程库,我们创建的协程在运行完之后会退出到"main"的运行流程中,然后再在主流程中选择需要恢复运行的协程。

schedule

  • 首先是schedule结构体,其实就用于操作协程调度的类:
    • stack是公共使用的栈空间,每个运行的协程最终都是在这个公共的栈空间中运行的,所以协程切换的时候会涉及到栈的拷贝,后面会详细说明;
    • main其实就是用于保存主流程上下文的,便于协程发生切换的时候退出到主流程;
    • co存储协程的数组,nco, cap记录当前数组的容量和使用情况;
    • running存储当前正在运行的协程。
1
2
3
4
5
6
7
8
struct schedule {
char stack[STACK_SIZE];
ucontext_t main;
int nco;
int cap;
int running;
struct coroutine **co;
};

coroutine

  • 然后是coroutine结构体,用于存储协程的基本信息:
    • func是这个协程的函数入口,函数的定义必须符合coroutine_func类型定义,ud是入参;
    • ctx用于保存当前协程上下文,便于在挂起之后能够恢复到协程挂起的位置继续执行;
    • stack是在协程发生挂起的时候,暂存当前协程的栈信息的。
1
2
3
4
5
6
7
8
9
10
struct coroutine {
coroutine_func func;
void *ud;
ucontext_t ctx;
struct schedule * sch;
ptrdiff_t cap;
ptrdiff_t size;
int status;
char *stack;
};

coroutine_new

  • 功能比较简单,其实就是把一个函数注册成协程,并且返回协程标识符。这里可能遇到co数组空间不足的可能需要扩容。

coroutine_resume

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void 
coroutine_resume(struct schedule * S, int id) {
assert(S->running == -1);
assert(id >=0 && id < S->cap);
struct coroutine *C = S->co[id];
if (C == NULL)
return;
int status = C->status;
switch(status) {
case COROUTINE_READY:
getcontext(&C->ctx);
C->ctx.uc_stack.ss_sp = S->stack;
C->ctx.uc_stack.ss_size = STACK_SIZE;
C->ctx.uc_link = &S->main;
S->running = id;
C->status = COROUTINE_RUNNING;
uintptr_t ptr = (uintptr_t)S;
makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
swapcontext(&S->main, &C->ctx);
break;
case COROUTINE_SUSPEND:
memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
S->running = id;
C->status = COROUTINE_RUNNING;
swapcontext(&S->main, &C->ctx);
break;
default:
assert(0);
}
}
  • 恢复指定协程的运行,这里可能遇到两种情况:一种是该协程第一次运行,走COROUTINE_READY分支;第二种情况是该协程被挂起之后再运行,走COROUTINE_SUSPEND分支。
  • 首次运行需要我们设置ucontext相关的信息,所以需要调用getcontext并设置栈以及返回位置的上下文,然后通过makecontext设置协程的入口函数为mainfunc。这里并没有直接设置成用户注册的函数,是因为需要额外做一些诸如运行后释放资源的操作,所以这里包了一层函数。最后通过swapcontext切换到协程运行。
  • 如果是挂起之后的恢复,则需要恢复栈数据,就是把coroutine->stack中暂存的栈数据复制到schedule->stack公共栈空间中,然后再通过swapcontext切换到协程运行。

coroutine_yield

1
2
3
4
5
6
7
8
9
10
11
void
coroutine_yield(struct schedule * S) {
int id = S->running;
assert(id >= 0);
struct coroutine * C = S->co[id];
assert((char *)&C > S->stack);
_save_stack(C,S->stack + STACK_SIZE);
C->status = COROUTINE_SUSPEND;
S->running = -1;
swapcontext(&C->ctx , &S->main);
}
  • 挂起操作是在协程流程中调用的,挂起之前需要暂存当前协程的栈空间数据,这是通过_save_stack函数的实现的。
  • 然后就是更新状态,并通过swapcontext切换到主流程中。
  • 简单说一下_save_stack函数,用了一个很巧妙的方法,在函数内部定义了一个局部变量dummy,此时dummy的地址应该是栈顶,而栈底是已知的,这样我们就知道当前协程使用的栈的大小。

三、libco实现

  • libco是微信后台大规模使用的c/c++协程库,2013年至今稳定运行在微信后台的数万台机器上。
  • 通过提供socket族函数的hook,使得后台逻辑服务几乎不用修改逻辑代码就可以完成异步化改造。
  • 上下文切换采用的汇编实现方案,手动保存当前CPU的寄存器状态。

3.1 动态链接实现hook

参考资料:

libco源码阅读(八):hook机制

动态链接黑魔法: Hook 系统函数

3.1.1 why…?

  • 对于线程来说,当某些系统调用发生阻塞时会被系统自动挂起并等待特定的信号到来,并且把当前的CPU调度给另外的线程,这个过程对用户是完全无感的。
  • 但协程是用户态的概念,操作系统并不知道它的存在,如果我们在协程中调用了某个会阻塞的接口,则会直接将当前线程阻塞,并不会讲CPU调度给别的协程。协程系统中想要让渡CPU需要手动调用co_resumeco_yield这样的方法。
  • 因此我们如果想要在原来的代码中引入协程,肯定是需要做很多侵入式改造的,即在原来的业务逻辑中加入很多协程相关的调用。
  • 而基于动态链接实现的hook方案可以在不更改原代码的基础上修改要调用的函数的逻辑。基于此原理我们可以修改各种系统调用函数(libco主要是修改了socket函数族),从而实现非侵入式的异步化改造。

3.1.2 hook机制

  • hook机制本质上是一种函数的劫持技术,比如我们通常需要调用malloc函数来进行内存分配,那么能不能我们自己封装一个同名、同入参和同返回值的malloc函数来替代系统的malloc函数,在我们自己封装的malloc函数中实现一些特定的功能,而且也能回调系统的malloc,这就是hook机制。

  • 系统提供给我们的dlopendlsym族函数可以用来操作动态链接库,比如我们要hook系统调用函数read,我们可以使用dlsym族函数获取hook前函数的地址,这样就可以在自己实现的read中回调原函数,并加上一些额外的逻辑,并且在运行是会调用我们的版本了。

3.1.3 动态链接

  • 动态链接是指在程序编译时并不会被连接动态库到目标代码中,而是在程序运行是才被载入。不同的应用程序如果调用相同的库,那么在内存里只需要有一份该共享库的实例,规避了空间浪费问题。
  • 动态库在程序运行是才被载入,也解决了静态库对程序的更新、部署和发布页会带来麻烦。用户只需要更新动态库即可。
  • Linux中动态链接库是.so结尾的文件,本质和我们编译生成的.o文件是类似的,在加载程序或者运行程序发现需要访问动态链接库中的函数实现时,会在所有的库空间中去寻找对应的实现。而如果同一个函数接口在多个动态库中都被实现过,那么到底调用哪个实现就取决于先查找到谁。

3.1.4 环境变量LD_PRELOAD

  • LD_PRELOADLinux系统的一个环境变量,它可以影响程序的运行时的链接(Runtime linker),允许你定义在程序运行前优先加载的动态链接库。
  • 这个功能主要就是用来有选择性的载入不同动态链接库中的相同函数。通过这个环境变量,我们可以在主程序和其动态链接库的中间加载别的动态链接库,甚至覆盖正常的函数库。一方面,我们可以以此功能来使用自己的或是更好的函数(无需更改别人的源码);而另一方面,我们也可以向别人的程序注入程序,从而达到特定的目的。
  • 系统寻找动态库时加载顺序为:
    1. LD_PRELOAD
    2. LD_LIBRARY_PATH
    3. /etc/ld.so.cache
    4. /lib
    5. /usr/lib

3.1.5 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// hookread.cpp
#include <dlfcn.h>
#include <unistd.h>

#include <iostream>

typedef ssize_t (*read_pfn_t)(int fildes, void *buf, size_t nbyte);

static read_pfn_t g_sys_read_func = (read_pfn_t)dlsym(RTLD_NEXT,"read");

ssize_t read( int fd, void *buf, size_t nbyte ){
std::cout << "进入 hook read\n";
return g_sys_read_func(fd, buf, nbyte);
}

void co_enable_hook_sys(){
std::cout << "可 hook\n";
}

// main.cpp
#include <bits/stdc++.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

using namespace std;

int main(){
int fd = socket(PF_INET, SOCK_STREAM, 0);
char buffer[10000];

int res = read(fd, buffer ,10000);
return 0;
}
1
2
3
g++ -o main main.cpp
g++ -o hookread.so -fPIC -shared -D_GNU_SOURCE hookread.cpp -ldl
LD_PRELOAD=./hookread.so ./main
  • 但是libco并不是这样做的,整个libco中你都看不到LD_PRELOADlibco使用了一种特殊的方法,即通过在用户代码中包含co_hook_sys_call.cpp中定义的函数,这样也可以做到使用我们自己的库去替换系统的库。

  • 实现也很简单,就是单独编译得到hookread.o文件,然后再编译main.cppg++ main.cpp -ldl hookread.o。但这种方法需要在用户代码中引入头文件hookread.h

3.2 函数调用过程

参考资料:

学习x86-64寄存器(x64 Register Set)

x86-64寄存器和栈帧

libco源码阅读(四):协程的上下文环境

3.2.1 栈帧

stack-frame.png
  • 首先在Linux程序的虚拟地址空间中,栈段是从高地址向低地址增长的,而堆是从低地址向高地址增长,所以我们的手动申请的栈空间是需要设置成从最高地址开始使用的。
  • x86_64架构的CPU中有两个专门用来管理函数栈帧的寄存器,分别是rbp, rsp。前者是当前栈帧的底部(高地址),后者是当前栈帧的顶部(低地址)。
  • 栈帧需要完成的主要工作是为局部变量开辟空间,此外还需要处理调用函数的一些相关工作。

函数调用

  • 首先说函数调用(caller方的工作),我们需要完成的工作主要包括传递参数以及记录函数跳转之后的返回地址。写入参的过程是以当前的rsp为基地址向高地址写入的,并且参数是从右往左以此写入,所以上图青色部分是param N ... param 1。注意这里是以栈顶往高地址写数据,即存储参数的空间是调用者的栈帧空间,这里是需要提前预留出来的。
  • 参数写完之后紧接的指令就是call func调用跳转,这条指令除了将rip更换成目标函数入口位置还会同时保存函数调用后的返回位置,这个过程是完全自动完成的设置rip的操作没有体现在汇编代码中。返回地址是紧接着在栈顶位置写入。
1
2
3
4
call foo		; 等价于下面的指令

pushq %rip ; 保存下一条指令(第41行的代码地址)的地址,用于函数返回继续执行
jump foo ; 跳转到函数foo

函数进入

  • 进入一个新的函数之后首先需要保存当前寄存器中的数据,方便调用完成之后恢复状态。因为每个函数需要用到的寄存器是不同的,所以要保存那些寄存器根据需要决定。但rbprsp是肯定需要更改的,所以进入函数和推出时固定需要保存和恢复rbprsp
1
2
3
4
5
push rbp       ; 函数最开始,保存rbp到stack
mov rbp, rsp ; 扩展stack之前,保存此值,作为新stack frame的底
; ...
mov rsp, rbp ; 最后恢复
pop rbp
  • 然后需要扩展当前函数的栈帧空间,即让rsp减去一个值,局部变量需要用到的空间就分配完成了。

函数退出

  • 退出的时候需要注意将之前保存过的寄存器值进行恢复,再恢复rbprsp;然后调用ret,会自动根据当前sp地址存储的返回位置进行跳转。此时一个函数的调用就完成了。
1
2
3
ret			; 等价于下面的指令

popq %rip ; 恢复指令指针寄存器

3.2.2 传参优化

  • 根据前面的描述,当我们在为调用函数设置传入参数的时候是通过栈来存储的,但实际上函数的参数比较少的时候会使用寄存器来完成传参,比如函数只有两个参数func(int* arg1, int* arg2)的时候,会使用rdiarg1,使用rsiarg2

3.3 关键数据结构

3.3.1 协程实体

  • stCoRoutine_t相当于Linux中管理线程/进程的task_struct结构体,保存这协程的私有数据和协程切换时的上下文信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 协程的执行环境, 运行在同一个线程上的各协程是共享该结构
pfn_co_routine_t pfn; // 一个函数指针, 指向实际待执行的协程函数
void *arg; // 函数的参数
coctx_t ctx; // 用于协程切换时保存CPU上下文(context)的,即 esp、ebp、eip 和其他通用寄存器的值
char cStart; // 协程是否执行过resume
char cEnd; // 协程是否执行结束
char cIsMain; // 是否为主协程修改
char cEnableSysHook; // 此协程是否hook库函数,即用自己实现的函数替代库函数
char cIsShareStack; // 是否开启共享栈模式

void *pvEnv; // 保存程序系统环境变量的指针

//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem; // 协程运行时的栈内存

//save satck buffer while confilct on same stack_buffer;
char* stack_sp; // 保存栈顶指针sp
unsigned int save_size; // 保存协程的栈的buffer的大小
char* save_buffer; // 使用共享栈模式时,用于保存该协程的在共享栈中的内容

stCoSpec_t aSpec[1024];
};

3.3.2 协程上下文信息

  • coctx_t保存协程的上下文,实际就是寄存器的值,C/C++都没有函数可以直接接触寄存器,所以操作这个参数的时候需要嵌入一点汇编代码。
1
2
3
4
5
6
7
8
9
10
11
struct coctx_t
{
#if defined(__i386__)
void *regs[ 8 ]; // X86 32位架构下有8个通用寄存器
#else
void *regs[ 14 ]; // x86 64位下有16个寄存器,这里保存14个
#endif
size_t ss_size; // 栈的大小
char *ss_sp; // 栈顶指针esp

};

3.3.3 私有栈和共享栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct stStackMem_t
{
stCoRoutine_t* ocupy_co; // 执行时占用的那个协程实体,也就是这个栈现在是那个协程在用
int stack_size; // 当前栈上未使用的空间
char* stack_bp; // stack_buffer + stack_size
char* stack_buffer; // 栈的起始地址,当然对于主协程来说这是堆上的空间

};

struct stShareStack_t
{
unsigned int alloc_idx; // stack_array中我们在下一次调用中应该使用的那个共享栈的index
int stack_size; // 共享栈的大小,这里的大小指的是一个stStackMem_t*的大小
int count; // 共享栈的个数,共享栈可以为多个,所以以下为共享栈的数组
stStackMem_t** stack_array; // 栈的内容,这里是个数组,元素是stStackMem_t*
};
  • stStackMem_t是运行协程私有栈的结构,stShareStack_t则是共享栈的结构。libco有两种协程栈的策略:
    • 一种是一个协程分配一个栈,这也是默认的配置,因为默认大小为128KB,如果1024个协程就是128MB,这会带来较大空间的浪费。
    • 另一种策略为共享栈,即所有协程使用同一个栈,然后每个协程使用一个buffer来保存自己的栈内容,这个buffer大小根据具体的需要进行申请,因此可以节省内存。libco在进行协程切换的时候,先把共享栈的内容复制到要换出的协程实体的结构体buffer中,再把即将换入的协程实体的结构体中的buffer内容复制到共享栈中。这种方法是多个协程共用一个栈,但缺点是在协程切换的时候需要拷贝已使用的栈空间。

3.3.4 线程环境

  • stCoRoutineEnv_t是一个非常关键的结构,是一个线程内所有协程共享的结构。其中存放了一些协程调度相关的数据,当然叫调度有些勉强,因为libco实现的非对称式协程实际上没有什么调度策略,完全就是协程切换会用到。
1
2
3
4
5
6
7
8
9
10
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 协程的调用栈
int iCallStackSize; // 调用栈的栈顶指针
stCoEpoll_t *pEpoll; // epoll的一个封装结构

//for copy stack log lastco and nextco
stCoRoutine_t* pending_co; // 目前占用共享栈的协程
stCoRoutine_t* ocupy_co; // 与pending在同一个共享栈上的上一个协程
};
  • pCallStack : 如果将协程看成一种特殊的函数,那么这个 pCallStack 就时保存这些函数的调用链的栈。非对称协程最大特点就是协程间存在明确的调用关系;甚至在有些文献中,启动协程被称作 call,挂起协程叫 return。非对称协程机制下的被调协程只能返回到调用者协程,这种调用关系不能乱,因此必须将调用链保存下来。
  • pending_coocupy_co:对上次切换挂起的协程和嵌套调用的协程栈的拷贝,为了减少共享栈上数据的拷贝。在不使用共享栈模式时 pending_coocupy_co 都是空指针。(大概就是有的情况下共享栈中上次留下来的数据和现在将要重新写入的是一样的,可以省略恢复过程)

3.3.5 协程属性

  • 协程属性的结构体stCoRoutineAttr_t标记了栈的大小和是否使用共享栈。
1
2
3
4
5
6
7
8
9
10
struct stCoRoutineAttr_t
{
int stack_size; // 协程的私有栈或者共享栈大小
stShareStack_t* share_stack; // 指向协程的共享栈
stCoRoutineAttr_t()
{
stack_size = 128 * 1024;
share_stack = NULL;
}
}__attribute__ ((packed));

3.4 协程的基本操作

参考文献:

libco源码阅读(三):协程的创建和运行

libco源码阅读(四):协程的上下文环境

libco 分析(上):协程的实现

  • 协程的基本操作不外乎创建、恢复运行、挂起切换,这三个操作分别是由co_create, co_resume, co_yield三个函数完成的。

3.4.1 协程创建

1
2
3
4
5
6
7
8
9
10
int co_create(stCoRoutine_t **ppco, const stCoRoutineAttr_t *attr, pfn_co_routine_t pfn, void *arg)
{
if (!co_get_curr_thread_env())
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env(co_get_curr_thread_env(), attr, pfn, arg);
*ppco = co;
return 0;
}
  • ppco:是协程的主体结构,存储着一个协程所有的信息;
  • attr:其实和线程一样,是我们希望创建的协程的一些属性,不过libco中这个参数简单一点,只是标记了栈的大小和是否使用共享栈,传入为NULL时表示不使用共享栈;
  • pfn:是我们希望协程执行的函数,当然实际执行的是一个封装后的函数;
  • arg:是传入函数的参数。

co_get_curr_thread_env

  • 用于获取当前线程绑定的协程环境(这里暗含一个线程下的所有协程都是一起管理的,不会出现两个管理单元)。为了保证每个线程下运行时都能拿到指定的stCoRoutineEnv_t,是使用线程私有化实现的。

co_init_curr_thread_env

  • 如果发现当前还没有设置过stCoRoutineEnv_t,则说明是第一次创建此线程下的协程,这个函数用于初始化线程的环境变量,即初始化stCoRoutineEnv_t这个结构,并且创建一个主协程,主协程是线程环境栈中的第一个协程,该协程不执行任何函数。
  • 会完成几个关键的任务:初始化stCoRoutineEnv_t;调用co_create_env创建主协程实体;并且将其放入协程调用栈中env->pCallStack;创建epoll对象(本质上libco还是为网络IO服务的,所以在设计上就是和epoll绑定的)

co_create_env

  • 真正创建stCoRoutine_t对象的函数,主要是初始化对象并且分配私有栈或者公共栈

3.4.2 协程运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;

/* 获取线程环境中的栈顶协程实体 */
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];

if( !co->cStart ) // 如果协程没有执行过resume
{
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co; // 把当前协程压入线程环境的栈中
co_swap( lpCurrRoutine, co ); // 进行两个协程的上下文切换
}
  • 大概流程就是从env->pCallStack调用栈中查找到当前协程,然后把要恢复运行的协程写入到调用栈中,最后调用co_swap完成协程的切换。(如果这个这个协程是第一次运行需要我们通过coctx_make创建上下文)

coctx_make

  • 要理解初始化上下文函数的工作需要结合coctx_swap的工作原理,在libco中实现的协程切换是建立在模仿函数调用和返回上实现的,可以注意coctx_swap的最后一条指令是ret,所以为了在coctx_swap返回时跳转到目标函数位置,只需要提前将目标函数入口地址填到sp指向的位置即可。
  • 当然除了去设置跳转位置外,还需要设定好栈的位置以及入参等,所以这个函数的本质就是初始化coctx_t,后面调用coctx_swap时会将存储在coctx_t中的数据恢复到寄存器中。

coctx_swap

  • 首先coctx_swap的定义是void coctx_swap(coctx_t*, coctx_t*),两个入参会被放置rdi(第一个参数)和rsi(第二个参数)
  • 实际在调用的时候是co_swap( lpCurrRoutine, co ),即rdi存当前协程的ctxrsi存将要切换到的协程的ctx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
coctx_swap:
leaq (%rsp),%rax ; 将栈指针 %rsp 中存储的地址加载到 %rax 中,注意是寄存器中值的copy
movq %rax, 104(%rdi) ; regs[13]: rsp
movq %rbx, 96(%rdi) ; regs[12]: rbx
movq %rcx, 88(%rdi) ; regs[11]: rcx
movq %rdx, 80(%rdi) ; regs[10]: rdx
movq 0(%rax), %rax ; 这一句就是将 %rsp 指向的内存中的返回地址取出并存储到 %rax 中
movq %rax, 72(%rdi) ; regs[9]: ret
movq %rsi, 64(%rdi) ; regs[8]: rsi
movq %rdi, 56(%rdi) ; regs[7]: rdi
movq %rbp, 48(%rdi) ; regs[6]: rbp
movq %r8, 40(%rdi) ; regs[5]: r8
movq %r9, 32(%rdi) ; regs[4]: r9
movq %r12, 24(%rdi) ; regs[3]: r12
movq %r13, 16(%rdi) ; regs[2]: r13
movq %r14, 8(%rdi) ; regs[1]: r14
movq %r15, (%rdi) ; regs[0]: r15
xorq %rax, %rax ; 清空 %rax

movq 48(%rsi), %rbp ; 后半部分其实就是将第二个参数coctx中缓存的数据重新加载到寄存器中
movq 104(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 56(%rsi), %rdi
movq 80(%rsi), %rdx
movq 88(%rsi), %rcx
movq 96(%rsi), %rbx
leaq 8(%rsp), %rsp
pushq 72(%rsi)

movq 64(%rsi), %rsi
ret
  • 注意x86_64架构的CPU其实是有很多寄存器的,但是这里其实只存储了一部分的寄存器值,这是和寄存器的使用规则有关系:如果寄存器遵循被调用者使用规则,那么被调用的函数内部如果需要使用这些寄存器就需要自己去缓存;如果寄存器遵循调用者使用规则,那么调用方要决定在调用前是否需要提前缓存这部分数据。

    1. %rax 作为函数返回值使用。
    2. %rsp 栈指针寄存器,指向栈顶
    3. %rbp 栈桢指针,指向栈基
    4. %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第1参数,第2参数……
    5. %rbx,%r12,%r13,%14,%15 用作数据存储,遵循被调用者使用规则,简单说就是随便用,调用子函数之前要备份它,以防他被修改
    6. %r10,%r11 用作数据存储,遵循调用者使用规则,简单说就是使用之前要先保存原值
    7. %rip: 相当于PC指针指向当前的指令地址,指向下一条要执行的指令
  • 所有寄存器中%rax, %r10, %r11是永远不需要被调用函数内部做缓存的,如果需要缓存是在调用函数之前就缓存了;而其他的寄存器如果函数内需要使用,则函数内要自己缓存。

co_swap

  • 这个函数是co_resume中直接调用的函数,除了在内部调用coctx_swap之外还需要在切出之前和切回之后完成保存栈和恢复栈两个工作。

3.4.3 协程挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void co_yield_env( stCoRoutineEnv_t *env )
{

stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];

env->iCallStackSize--;

co_swap( curr, last);
}

void co_yield_ct()
{

co_yield_env( co_get_curr_thread_env() );
}
void co_yield( stCoRoutine_t *co )
{
co_yield_env( co->env );
}
  • 挂起当前协程,并切换到协程栈中的上一个协程,这里的上一个协程其实是恢复当前要挂起的这个协程的协程,每当执行co_resume这个函数的时候,是会将恢复的协程写入到这个协程栈中的。
  • 挂起操作会将当前协程从这个协程栈中pop出来。
  • 这里协程栈的操作涉及到了协程实现方案的问题,前面有提到libco实现的协程是非对称的,即有栈协程。协程之间的调用关系是需要协程栈保存下来的,也就意味着协程挂起进行切换的时候必须恢复到调用方;
  • 还有一种协程的实现是对称的,即无栈协程。那么所有协程之间是平等的,相互之间可以任意进行切换。

3.5 协程调度

3.5.1 协程间调度框架

  • 协程是构建在用户态的一种机制,所有协程的恢复运行和挂起操作都是需要显示在代码中调用相关函数才能实现的,这一点相较于线程是有很大不同的,同时一个协程如果一直不让出CPU,也不会有时间片机制让别的协程有机会开始执行。
  • 因此在代码中是需要存在一个调度器来在合适的时候让指定的协程恢复运行的,也就是根据事件触发对应协程的恢复运行,这就是libco这个协程框架的基本调度思路了。
  • 题外话:因为协程的设计本来就是为了解决网络编程中的问题,所以libco的核心是围绕多路复用epoll以及网络套接字socket来实现的。libco实现了对相关的网络编程接口的hook,在hook函数中添加co_yield以及一些封装,比如将epoll封装成poll来对外提供服务,等等。
  • libco总共支持三种事件:网络事件、超时事件、同步事件。调用libco为我们提供的特定接口并需要进入等待时会自动注册这些事件,然后将当前协程挂起并等待主协程处理事件以及调度。主协程调度功能是由co_eventloop函数支持的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );

for(int i=0;i<ret;i++)
{
item->pfnPrepare( item,result->events[i],active );
}

TakeAllTimeout( ctx->pTimeout,now,timeout );

Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

lp = active->head;
while( lp )
{
lp->pfnProcess( lp );
lp = active->head;
}
}
}
  • 这里只保留了核心逻辑代码,首先co_eventloop函数是一个无限循环,不断等待新的事件到达,然后根据到达的事件恢复运行对应的协程。
    • 第一步是调用co_epoll_wait获取满足条件的网络事件,其实内部就是调用epoll_wait获取指定事件已经到达的socket fd。注意这里设置的超时时间是1 ms,这是利用超时机制实现超时事件的触发。
    • 第二步是对已经准备好的socket fd调用提前设定的pfnPrepare函数,这个和一些统计工作有关系,不是核心逻辑。
    • 第三步是通过TakeAllTimeout获得已经超时的事件,这里就是简单对比时间完成的。
    • 第四步会将超时事件和网络事件两个链表合并,然后依次执行提前注册的回调函数pfnProcess,其实回调函数很简答,就是恢复事件绑定的协程,即内部调用co_resume
  • 以上就是主要逻辑了,会发现缺失了同步事件的处理,这和同步事件的实现有关系。因为条件变量的触发需要另外一个协程在执行流程中调用co_cond_signal完成。这里会完成将该事件激活并放到pstActiveList中。那么下一次co_eventloop的循环就会一并将该事件处理。
  • 分析源码后我们便清楚了,libco的调度就是建立在epoll之上的,基于事件的一个模型。因此我们使用libco框架写协程代码的时候是必须在主协程(也就是原线程执行流程)的最后调用co_eventloop函数的。在此之前应该把协程创建好并且先运行一些,否则一个刚创建的协程没有等待任何事件,永远不会被调度到。当然一个协程内部也是可以再创建新的协程的,但也是同样的道理,创建完之后要先运行。

3.5.2 Timeout事件

  • co_eventloop的第一个参数是stCoEpoll_t,这就是辅助调度的结构体,所以只有一份。其内部的stTimeout_t存储了所有超时等待事件,是用来查询超时事件的结构。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = 1024 * 10;

struct stTimeout_t *pTimeout;

struct stTimeoutItemLink_t *pstTimeoutList;

struct stTimeoutItemLink_t *pstActiveList;

co_epoll_res *result;

};

struct stTimeoutItemLink_t
{
stTimeoutItem_t *head;
stTimeoutItem_t *tail;
};

struct stTimeout_t
{
stTimeoutItemLink_t *pItems; // 链表头的数组(有多个链表,然后以数组形式存储了所有的链表头)
int iItemSize; // 数组大小

unsigned long long ullStart; // 记录时间
long long llStartIdx; // 记录时间所对应的数组中的位置
};
  • 具体存储超时事件的结构体是stTimeoutItem_t,这是一个双向链表的节点,较为关键的是void *pArg,以及两个回调函数。前者其实stCoRoutine_t*,我们要在回调函数中恢复对应协程运行需要从这里拿到协程。
  • 回调函数是核心,当co_eventloop拿到已触发的事件了后续该做的事(一般就是恢复对应协程运行)都定义在回调函数中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct stTimeoutItem_t
{

enum
{
eMaxTimeout = 40 * 1000 //40s
};
stTimeoutItem_t *pPrev;
stTimeoutItem_t *pNext;
stTimeoutItemLink_t *pLink;

unsigned long long ullExpireTime;

OnPreparePfn_t pfnPrepare; // 预处理回调函数,在epoll_loop中被调用,只有epoll_wait触发的事件会调用这个回调函数,超时和同步事件都不会调用
OnProcessPfn_t pfnProcess; // 正式的回调函数,在epoll_loop中被调用

void *pArg; // routine 指向协程实体
bool bTimeout;
};
  • 提一下TakeAllTimeout函数,其实就是更新stTimeout_t中的时间记录,然后把已经超时的stTimeoutItem_t取出来。另外将其添加到pstTimeoutList中,等待后面统一处理。
  • 还有注册超时事件的AddTimeout函数,就是把stTimeoutItem_t添加到 到期时间 对应的链表中。

3.5.3 Poll事件

  • libco对网络编程需要用到的接口都做了相应的hook,关键接口主要有三类:read, write, poll,读写都是对一个socket fd的操作,所以一个事件对应一个协程,用stPollItem_t表示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct stPollItem_t : public stTimeoutItem_t
{
struct pollfd *pSelf;
stPoll_t *pPoll;

struct epoll_event stEvent;
};

struct stPoll_t : public stTimeoutItem_t
{
struct pollfd *fds;
nfds_t nfds; // typedef unsigned long int nfds_t;
stPollItem_t *pPollItems;
int iAllEventDetach;
int iEpollFd;
int iRaiseCnt;
};
  • 因为网络事件往往是存在超时时间的,网络事件一般是丢到epoll中内核自己维护,但是超时机制需要我们自己来管理。所以网络事件必须同时也是超时事件,被丢到stTimeout_t中管理。所以stPollItem_t是继承实现的,一些别的变量是处理pollepoll转换的。(小纠正:其实stPollItem_t是不会被当作超时任务丢到stTimeout_t中的,真正丢进去的是stPoll_t,但这里依然选择继承的原因是能用上回调函数等变量)
  • 还存在一种接口是对poll的处理,这个接口是需要同时等待多个网络事件,但是这多个网络事件和在一起等待一个超时事件。所以用了stPoll_t存储这一组stPollItem_t(数组pPollItems[nfds]),同时本身又继承stTimeoutItem_t,于是可以被当作是一个超时事件丢到stTimeout_t中管理。
  • poll还有一个特点是,当任意一个事件触发之后,同一批等待的所有事件都应该退出,这一点和epoll的实现是不同的。而stPollItem_t中注册的pfnPrepare预处理函数就是解决这个问题的,当一个stPollItem_t事件到达之后,需要将注册的stPoll_t超时任务删除。
  • 关于网络事件的注册,libco库的实现方案是统一使用hook后的poll函数完成注册。这样可以尽量做到用户代码的无侵入性。所有读写接口的hook内部也是先调用poll等到事件到达之后开始真正执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int co_poll_inner( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
//1.struct change
stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));

//2. add epoll
for(nfds_t i=0;i<nfds;i++)
{
int ret = co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );
}

//3.add timeout
int ret = AddTimeout( ctx->pTimeout,&arg,now );
int iRaiseCnt = 0;

co_yield_env( co_get_curr_thread_env() );

//4.clear epoll status and memory
for(nfds_t i = 0;i < nfds;i++)
{
int fd = fds[i].fd;
co_epoll_ctl( epfd,EPOLL_CTL_DEL,fd,&arg.pPollItems[i].stEvent );
}
return iRaiseCnt;
}
  • poll函数内部核心是co_poll_inner,其完成三个主要步骤:
    • 首先创建stPoll_t对象,并填充对应参数;
    • 然后把入参的fds依次添加到epoll中;
    • 接着根据超时时间,将stPoll_t添加到超时事件中,并让出CPU等待事件触发之后回来;
    • 最后在等到事件到达恢复运行之后,需要将这些fdsepoll中删除,本质上是为了契合poll接口的定义。

3.5.4 Cond事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct stCoCondItem_t 
{
stCoCondItem_t *pPrev;
stCoCondItem_t *pNext;
stCoCond_t *pLink;

stTimeoutItem_t timeout;
};

struct stCoCond_t
{
stCoCondItem_t *head;
stCoCondItem_t *tail;
};
  • 其实实现很简单,对同一个条件变量stCoCond_t的等待co_cond_timedwait会创建一个stCoCondItem_t并插入到等待链表中。可以注意到这里stTimeoutItem_t是作为成员变量出现的,而不是和之前的stPollItem_t保持一致采用继承的方法。显然同步事件是后来添加的新功能😆
  • co_cond_signal或者co_cond_broadcast触发等待条件变量的协程,就是把stTimeoutItem_t取出来,并添加到pstActiveList当中,注意这里是用的尾插法AddTail,所以在本轮co_eventloop调度中是可以被调度运行到的。(但我不明白为啥不就地恢复运行,而一定要让co_eventloop去调度。或许是有公平性问题,先来的事件应该先运行?)

协程库
https://lluvialuo.github.io/2024/05/12/协程库/
作者
Lluvia Luo
发布于
2024年5月12日
许可协议