
协程是用同步的编程方式达到异步的性能的框架。
现在较为成熟的框架有go语言实现的libgo和C++实现的libco。
同步为什么性能低?因为检测IO与读写IO在一个流程,当IO未就绪时需要进行等待,而异步的话由于不在一个流程,所以不需要等待。
但是,异步的流程不符合人的思维,而同步的好处就是逻辑清晰。
这里专门提示一下,mysql的连接池是同步 *** 作。
协程的实现主要是yield()让出CPU和resume()恢复运行。
有三种方式:
1、longjmp和setjmp,这个之前讲过,不再赘述。
2、linux提供的ucontext。
3、用汇编语言自己实现跳转。
相比1、2方法而言,此方法可读性较强。
利用switch()原语。
yield=switch(a,b);
resume=switch(a,b);
切换_switch()函数定义
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
参数 1:即将运行协程的上下文,是寄存器列表。
参数 2:正在运行协程的上下文,是寄存器列表。
其实协程的切换就是将CPU中各寄存器的值(上下文)进行更换,将正在运行中的CPU寄存器中的值保存到内存中,再将要运行的协程对应的寄存器值从内存拷贝到寄存器中。
我们 nty_cpu_ctx 结构体的定义,为了兼容 x86,结构体项命令采用的是 x86 的寄存器名字命名。
typedef struct _nty_cpu_ctx
{
void *esp;
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
这里介绍两个寄存器。
eip指向入口函数func,协程根据这一指针执行任务,函数的参数放到对应位置即可。
esp栈指针,指向栈空间的首地址。
_switch 返回后,执行即将运行协程的上下文,实现了上下文的切换。
1、还是一个CPU在运行,效率为什么提高了?
因为减少了阻塞等待的时间,使得性能与异步差不多。
2、如何知道IO已经就绪?使用epoll
epoll_ctl(add);
yield();
epoll_ctl(del);
在需要进行IO *** 作的地方执行yield()让出CPU,但是在让出之前先将fd加入epoll进行监听,当IO就绪后恢复运行,注意需要把epoll中的该节点删除。
3、与reactor的对比。
reactor是一种回调的方式,是异步 *** 作,没有协程直白,性能都差不多。
当然,reactor自己实现就可以,而协程却需要依赖库。
4、有栈协程与无栈协程。
区别在于有没有自己独立的栈空间。
有栈协程,每个协程都有自己独立的栈空间。
无栈协程,所有协程共享栈空间,需要计算某一个协程在栈的某个位置。
不推荐使用无栈协程,因为栈的管理复杂,实现复杂。
虽然无栈协程内存利用率较高,但是有栈协程的效率更高且实现容易,而且内存利用率也没有低多少,还可以根据需求自己定义栈空间的大小。
比如,如果服务器时传输文件或是较大的数据,栈开得大一些,1M,甚至10M都可以;如果只是单独的接收一些字符数据,就可以开小一些,1K、4K都可以。
所以,两者性能、利用率其实差不多,建议使用有栈协程。
5、协程相比于多线程的长处。
协程可读性强,实现相对简单。
对于多线程而言,如果同时 *** 作同一个fd,会很复杂,例如加锁之类。
因为如果一个线程正在用这个fd发送数据,而另一个线程直接关闭了这个fd,会出错。
6、协程一直不让出怎么办?
协程一直不让出,说明协程一直在进行处理的是一个计算密集型的任务。
协程是为了解决IO等待时挂起问题的框架,如果一直不让出,说明没有这种IO *** 作,此时用协程的意义就不大,不建议使用协程,可以用rpc。
7、协程的应用
一是文件 *** 作,如日志落盘。
二是对mysql等数据库的 *** 作。
三是网络IO。
一个协程处理一个IO,这样实现较为容易。
当然也可以处理多个IO,但是实现复杂。
typedef struct _nty_coroutine
{
nty_cpu_ctx ctx; //协程的上下文
proc_coroutine func; //协程的入口函数
void *arg; //入口函数的参数
size_t stack_size; //栈空间大小
nty_coroutine_status status; //协程的状态
nty_schedule *sched; //协程调度器
uint64_t id; //协程id
void *stack; //栈指针
RB_ENTRY(_nty_coroutine) sleep_node; //sleep集合
RB_ENTRY(_nty_coroutine) wait_node; //wait集合
TAILQ_ENTRY(_nty_coroutine) ready_next; //就绪集合
} nty_coroutine;
协程创建完成后,加入到就绪集合,等待调度器的调度。
协程在运行完成后,进行 IO *** 作,此时 IO 并未准备好,进入等待状态集合。
IO 准备就绪,协程开始运行,后续进行 sleep *** 作,此时进入到睡眠状态集合。
就绪(ready),睡眠(sleep),等待(wait)集合该采用如何数据结构来存储?
就绪(ready)集合并不没有设置优先级的选型,所有在协程优先级一致,所以可以使用队列来存储就绪的协程,简称为就绪队列(ready_queue)。
1、查看调度器是否存在,不存在则创建。 调度器作为全局的单例,将调度器的实例存储在线程的私有空间 pthread_setspecific。 先给出调度器的定义 yield()和resume()的参数由此而来 sleep集合中如果key相同,可以给后加入的节点key值稍加一点点(如1毫秒),这样就可以做到红黑树key值唯一,且没有太大影响,因为一般协程的实时性要求不会那么高。 当然,也可以不对key进行处理,直接插在相同key值节点的右子树上也可以。 简单介绍一下运行过程,调度器遍历各个集合,从中找出IO已就绪的协程进行resume(),该协程在运行中,IO如果未准备好,再yield()让回到调度器。 所以,协程在运行的过程中,大量的时间是运行在调度器上的。 运行的过程图如下 主要是用多进程或多线程实现,每个进程或线程亲和一个CPU。 每个进程内部都是单线程,每个进程一个调度器,好处是实现简单,对于协程代码不需要做过多修改。 所有线程共用一个调度器,需要对调度器加锁,锁定义在调度器中(红黑树用mutex,队列用spinlock,遍历用mutex),调度时加锁(如查找sleep红黑树中节点,要对sleep红黑树加锁),需要对协程本身的代码进行修改,实现较为复杂。 当然,如果加锁,要考虑死锁的问题。 其实,这里不太需要注意。 因为如果不掺杂业务,线程共用的就一个东西——调度器,不太可能会产生死锁。 这里不单独讲解。 协程需要封装若干接口,一类是协程本身的,二类是 posix 的异步封装协程 API。 1、协程创建 2、启动协程调度器 3、协程休眠 这里要注意,有些接口要做成异步的,有些接口不需要。 这些接口需要判断IO是否就绪,不成功就等待。 这些接口不会引起不正常的关系,但是可以做成非阻塞的。 有两种封装方法。 一是自己另外定义一套接口,如__read()。 这种方法有个弊端,对于mysql等这样的组件,它们内部源码调用了原来系统的接口(如read()),这样就需要修改mysql这些组件的源码。 这里有两个宏定义, 这个过程就相当于在调用系统接口时被截获了,调用了自己实现的函数,原来的函数变成了返回值。 这样达到的效果,在使用协程连接mysql时,可以不用改动mysql的源码就实现这些功能。 在malloc()、free()时加一个打印 *** 作,就可以发现什么地方有malloc没有free。 jemalloc和tcmalloc就可以这样,不需要改系统代码就可以用。 协程到底是解决什么问题的?同步改成异步怎么做?同步、异步的性能差异?yield、resume、调度、切换?协程API接口?hook?掌握了这些,协程的大概原理就清楚了。 欢迎分享,转载请注明来源:内存溢出
睡眠(sleep)集合需要按照睡眠时长进行排序,采用红黑树来存储,简称睡眠树(sleep_tree)红黑树在工程实用为
等待(wait)集合,其功能是在等待 IO 准备就绪,等待 IO 也是有时长的,所以等待(wait)集合采用红黑树的来存储,简称等待树(wait_tree),此处借鉴 nginx 的设计。
2、分配一个 coroutine 的内存空间,分别设置 coroutine 的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数。
3、将新分配协程添加到就绪队列 ready_queue 中。
协程的调度
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
{
assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL)
{
nty_schedule_create(0);
sched = nty_coroutine_get_sched();
if (sched == NULL)
{
printf("Failed to create scheduler\n");
return -1;
}
}
nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
if (co == NULL)
{
printf("Failed to allocate memory for new coroutine\n");
return -2;
}
int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
if (ret)
{
printf("Failed to allocate stack for new coroutine\n");
free(co);
return -3;
}
co->sched = sched;
co->stack_size = sched->stack_size;
co->status = BIT(NTY_COROUTINE_STATUS_NEW);
co->id = sched->spawned_coroutines ++;
co->func = func;
co->fd = -1;
co->events = 0;
co->arg = arg;
co->birth = nty_coroutine_usec_now();
*new_co = co;
TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next); //尾插
return 0;
}
typedef struct _nty_schedule
{
nty_cpu_ctx ctx; //当前协程的上下文
struct _nty_coroutine *curr_thread; //当前运行的协程。while (1)
{
//遍历睡眠集合,使用 resume 恢复 expired 的协程运行权
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != 0)
{
resume(expired);
}
//遍历等待集合,使用 resume 恢复 wait 的协程运行权
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
for (i = 0;i < nready;i ++)
{
wait = wait_tree_search(events[i].data.fd);
resume(wait);
}
// 使用 resume 恢复 ready 的协程运行权
while (!TAILQ_EMPTY(sched->ready))
{
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg);
void nty_schedule_loop(void);
posix接口
void nty_sleep();
需要做成异步的接口有accept()、connect()、send()、write()、sendto()等。
不需要做成异步的接口有socket()、close()、listen()等。
二是做成与系统一样的接口,使用hook。connect_f = dlsym(RLTD_NEXT, "connect");
dlsym()针对系统调用,dlopen()针对第三方库。
使用hook之后,虽然函数中仍然还是使用的原来的接口(connect),但是实际调用的是自己在应用层定义的函数入口。
另外,malloc()、free()也可以这么用,可以用来解决内存泄漏的问题。
写了不少,最后提示一下协程的几个关键问题。
微信扫一扫
支付宝扫一扫
评论列表(0条)