OR最大的特点就是,将Lua协程与Nginx事件驱动模型及非阻塞I/O结合起来。使用户可以在handler中使用 **同步但是依然是非阻塞** 的方式编写其应用代码,而无需关心底层的协程调度以及与Nginx事件驱动模型的交互。
本文将先从总体上介绍OR的协程调度机制,然后结合源码以及Lua栈的情况来详细了解各个部分是如何实现的,包括其异常保护、协程初始化、协程的恢复和执行、协程的挂起、协程的执行结束、协程出错的情况。

OpenResty(后面简称:OR)是一个基于Nginx和Lua的高性能Web平台,它内部集成大量的Lua API以及第三方模块,可以利用它快速搭建支持高并发、极具动态性和扩展性的Web应用、Web服务或动态网关。

OR最大的特点就是,将Lua协程与Nginx事件驱动模型及非阻塞I/O结合起来。使用户可以在handler中使用 同步但是依然是非阻塞 的方式编写其应用代码,而无需关心底层的协程调度以及与Nginx事件驱动模型的交互。

本文将先从总体上介绍OR的协程调度机制,然后结合源码以及Lua栈的情况来详细了解各个部分是如何实现的,包括其异常保护、协程初始化、协程的恢复和执行、协程的挂起、协程的执行结束、协程出错的情况。

本文主要关注调度函数内部的逻辑,如果想了解外部的调用流程。可以参看Openresty Lua钩子调用完整流程

注:lua-nginx模块与stream-lua-nginx模块的主体部分类似,后者实现相对简单一点。下面的讨论将基于stream-lua模块。

为了防止歧义,文中用到的一些术语明确一下:

  • 主线程:表示外层调用run_thread()的OS线程
  • 入口线程:每个handler被调用时会创建一个入口线程,用于执行lua代码
  • 用户线程:用户在Lua代码中通过ngx.thread.spawn()创建的线程
  • 用户协程:用户在Lua代码中通过coroutine.create()创建的协程
  • 协程:泛指所有协程,包括入口线程、用户线程和用户协程
  • vm:表示Lua虚拟机
  • L:视出现的上下文,一般表示父协程,在创建入口线程的时候表示Lua VM
  • co:一般表示新创建的协程
  • L栈: |协程表|新协程|顶|:表示Lua栈结构,最右边是栈顶

在深入了解协程调度机制之前,我们先来认识一下主要的数据结构:

  • 协程上下文:ngx_stream_lua_co_ctx_t
    • 协程内部栈(coctx->co
    • 协程状态(coctx->co_status
    • 维护协程之间关系的数据(父协程coctx->parent_co_ctx、僵尸子线程coctx->zombie_child_threads
    • 用户相关数据(coctx->data
    • 在Lua的registry表中对应该线程指针的引用值(co_ref
    • 一些状态标记(是否是用户线程is_uthread、是否因创建新线程thread_spawn_yielded被yield)
  • 模块上下文:ngx_stream_lua_ctx_t
    • ctx->cur_co_ctx(当前调度协程上下文)
    • ctx->co_op(协程是以何种方式YIELD)
  • 核心调度函数:ngx_stream_lua_run_thread()

首先你可能很好奇OR为什么要在C引擎层面自己实现协程的调度?或者说这么做的好处是什么?我觉得最主要的原因还是减轻开发者的负担。

我们知道Lua是个非常轻巧的语言,它不像Go有自己的调度器。Lua原生的对协程的操作无非就是coroutine.resume()coroutine.yield()。这两者是成对出现的,协程coroutine.yield()之后肯定回到父协程coroutine.resume()的地方,恢复子协程需要显式再次coroutine.resume()。如果要在Lua代码层面实现非阻塞I/O,那么父协程必须处理子协程I/O等待的情况,并在事件发生时恢复子协程的执行。如果需要同时进行多个任务,那么父协程就需要负责多个协程间的调度。因为协程的拓扑可能是一个复杂的树状结构,所以协程的调度管理将变得异常复杂。

OR在C引擎层帮我们把这些事情都做了,你无须再关心所有这些,只需专心写你的业务逻辑。为了支持同步非阻塞的方式编写应用代码,OR重写了coroutine的接口函数,从而接管了协程的调度,并在coroutine基础上封装抽象出了thread的概念。无论是coroutine还是thread,I/O等待对于用户都是透明的,用户无需关心。两者的主要区别是,coroutine父子之间的协作度更高,coroutine.yield()coroutine.resume()成对出现。在子协程执行完成(出错)或者显式coroutine.yield()之前,父协程一直处于等待状态。而thread则由调度器进行调度,子thread一旦开始执行就不再受父协程控制了,在需要并发请求时很有用。thread提供了spawn()wait()等接口,spawn()执行参数中指定的函数,直到执行完毕、出错或者I/O等待时返回。wait()则使父协程可以同步等待子线程执行完毕、获取结果。

OR在对协程调度上,最核心的改动是其创建新协程时的行为(coroutine.resume(), ngx.thread.spawn())。它不会直接调用lua_resume(),而是先lua_yield()回到主线程,然后由主线程再根据情况lua_resume()下一个协程。Lua代码域内从来不会直接调用lua_resume(),理解了这一点你就理解了OpenResty协程调度的精髓。

所以OR中协程拓扑是一个单层的结构,它只有一个入口点。这样使得协程调度更加灵活,I/O事件的触发时回调函数也更容易实现。

OR调度器根据lua_resume()的返回值,确定协程是挂起了、结束了还是出错了。因为OR改动了创建新协程时行为,同时又抽象了thread概念,所以如果是协程挂起的情况,还需要知道是什么原因挂起,以便做相应的不同处理。是继续调度?还是返回上层?我们前面提到的ctx->co_op便是做这个用途。

协程的调度在核心调度函数ngx_stream_lua_run_thread()中进行,它是创建或恢复协程的唯一入口点。最初是由配置的Lua钩子调用(图中ssl_cert_handler()),如果碰到了I/O等待的情况,后续则由对应的事件handler(图中的sleep_handler()read_handler())再次拉起。run_thread()里面实现了一个调度循环,循环里面先从ctx->cur_co_ctx获取下一个待resume的协程上下文,然后lua_resume()执行或恢复该协程,其返回值LUA_YIELD表示协程挂起,0表示协程执行结束,其余的表示协程出错了。其中协程挂起又分为四种不同的情况:即等待I/O、新建thread、coroutine.resume()coroutine.yield()。根据不同的情况,决定是跳到循环前面继续恢复下一个协程,还是返回上层函数。

下图是协程调度主要逻辑的示意图,可以看到在Lua代码域中无论是新建、挂起或恢复协程,都是先调用lua_yield()回到主线程。I/O操作例如ngx.tcp.receive()如果碰到了I/O等待,会在内部注册epoll事件(对于sleep的情况是定时器),然后自动lua_yield(),当事件触发时继续未完成的I/O操作,完成之后再调用run_thread()恢复之前被挂起的协程。

openresty-lua-coroutine-schedule

作为一个调度器,OpenResty扮演者类似操作系统内核的角色,不过它的调度对象是Lua协程。作为一个“内核”,无论其调度对象出了什么问题,都不应该使这个系统崩溃,而是应该将错误信息打印出来。

Openresty内部就做了一个这样的异常保护,其原理就是用setjmplongjmp包住了run_thread()里面的整个协程调度逻辑。

  1. /* 首先注册虚拟机的panic回调 */
  2. lua_atpanic(L, ngx_stream_lua_atpanic);
  3. /* setjmp保存环境 */
  4. NGX_LUA_EXCEPTION_TRY {
  5. /* 执行调度逻辑 */
  6. } NGX_LUA_EXCEPTION_CATCH {
  7. /* 出现异常时走到这里 */
  8. dd("nginx execution restored");
  9. }

ngx_stream_lua_atpanic()的实现也非常简单,只是简单地打印崩溃日志,然后调用NGX_LUA_EXCEPTION_THROW(1);恢复nginx的执行。

  1. int
  2. ngx_stream_lua_atpanic(lua_State *L)
  3. {
  4. #ifdef NGX_LUA_ABORT_AT_PANIC
  5. abort();
  6. #else
  7. u_char *s = NULL;
  8. size_t len = 0;
  9. if (lua_type(L, -1) == LUA_TSTRING) {
  10. s = (u_char *) lua_tolstring(L, -1, &len);
  11. }
  12. if (s == NULL) {
  13. s = (u_char *) "unknown reason";
  14. len = sizeof("unknown reason") - 1;
  15. }
  16. ngx_log_stderr(0, "lua atpanic: Lua VM crashed, reason: %*s", len, s);
  17. ngx_quit = 1;
  18. /* restore nginx execution */
  19. NGX_LUA_EXCEPTION_THROW(1);
  20. /* impossible to reach here */
  21. #endif
  22. }

这几个宏定义分别如下:

  1. #define NGX_LUA_EXCEPTION_TRY \
  2. if (setjmp(ngx_stream_lua_exception) == 0)
  3. #define NGX_LUA_EXCEPTION_CATCH \
  4. else
  5. #define NGX_LUA_EXCEPTION_THROW(x) \
  6. longjmp(ngx_stream_lua_exception, (x))

ngx_stream_lua_new_thread()用于创建入口线程

OR中需要在Registry表中存储每个创建出来的Lua线程的reference,这个存储协程的表在Registry表中对应的key是全局变量ngx_stream_lua_coroutines_key的地址,因此下面这段代码就是从Registry表中查询这个储存协程的表,返回到栈顶:

  1. /* 返回栈顶元素的索引,等于栈中元素的个数 */
  2. base = lua_gettop(L);
  3. /* 将存储协程的表对应的key压栈 */
  4. lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
  5. coroutines_key));
  6. /* 将key出栈,获取Registry表中key对应的元素,然后将结果入栈 */
  7. lua_rawget(L, LUA_REGISTRYINDEX);

接下来创建一个新的协程,同时初始化其全局表:

  1. /* 创建Lua协程,返回的新lua_State跟原有的lua_State共享所有的全局对象(如表),
  2. 但是有一个独立的执行栈。 协程依赖垃圾回收销毁 */
  3. /* L栈: |协程表|新协程|顶| */
  4. co = lua_newthread(L);
  5. /* 创建该协程的全局表,设置_G field为全局表自己 */
  6. /* L栈: |协程表|新协程|协程新的全局表|顶| */
  7. ngx_stream_lua_create_new_globals_table(co, 0, 0);
  8. /* 再创建一个新表 */
  9. /* L栈: |协程表|新协程|协程新的全局表|新表|顶| */
  10. lua_createtable(co, 0, 1);
  11. /* 拿到全局表 */
  12. /* L栈: |协程表|新协程|协程新的全局表|新表|旧全局表|顶| */
  13. ngx_stream_lua_get_globals_table(co);
  14. /* 新表的__index的值为栈顶的值,也即就全局表 */
  15. /* L栈: |协程表|新协程|协程新的全局表|新表|顶| */
  16. lua_setfield(co, -2, "__index");
  17. /* 新表出栈,将其设为索引-2处即协程新的全局表的元表 */
  18. /* L栈: |协程表|新协程|协程新的全局表|顶| */
  19. lua_setmetatable(co, -2);
  20. /* 设置协程新的全局表到对应索引,其_G field是自己,
  21. 其元表是新表,新表的__index是父协程的全局表 */
  22. /* L栈: |协程表|新协程|顶| */
  23. ngx_stream_lua_set_globals_table(co);

这一块的逻辑有点绕,我们来稍微理一下,其实就是用新建的全局表替换了旧的全局表,其中新的全局表的_G字段是它自己,新全局表的元表中__index元方法是旧的全局表。

此时的Lua虚拟机栈顶情况如下图所示:

  1. L->top | 栈顶 |
  2. L->top - 1 |Lua_State*| 新创建的协程
  3. L->top -2 | Lua Table| 存储协程引用的表

下面一步就是在Lua虚拟机中为这个新协程创建一个reference:

  1. /* 为栈顶对象(即新协程),创建并返回一个协程表中的引用 */
  2. /* 当前栈: |协程表|顶| */
  3. *ref = luaL_ref(L, -2);
  4. if (*ref == LUA_NOREF) {
  5. lua_settop(L, base); /* restore main thread stack */
  6. return NULL;
  7. }

最后恢复堆栈

  1. /* 设置栈顶索引 */
  2. /* 当前栈: |顶| */
  3. lua_settop(L, base);
  4. return co;

以上步骤还只是创建了一个什么都不能做的Lua协程,回到_by_chunk()函数之后还需要把入口函数放入协程中。

  1. /* 将lua虚拟机VM栈上的入口函数闭包移到新创建的协程栈上,
  2. 这样新协程就有了虚拟机已经解析完毕的代码了。*/
  3. lua_xmove(L, co, 1);
  4. /* 拿到co全局表,放到栈顶 */
  5. /* 当前栈: |入口closure|全局表|顶| */
  6. ngx_stream_lua_get_globals_table(co);
  7. /* 将全局表设为入口closure的环境表 */
  8. /* 当前栈: |入口closure|顶|*/
  9. lua_setfenv(co, -2);

至此,协程入口函数以及环境表已经设置好。接下来就是让它能够运行起来,让调度器能够调度它运行:

  1. /* 将nginx请求保存到协程全局表 */
  2. ngx_stream_lua_set_req(co, r);
  3. ctx->cur_co_ctx = &ctx->entry_co_ctx;
  4. ctx->cur_co_ctx->co = co;
  5. ctx->cur_co_ctx->co_ref = co_ref;

接下来就是注册cleanup钩子,然后ngx_stream_lua_run_thread()

用户线程由ngx.thread.spawn()创建,对应的C实现是ngx_stream_lua_uthread_spawn()。首先它会调ngx_stream_lua_coroutine_create_helper()创建一个新的协程。

注意协程都是在worker的虚拟机上创建的(不考虑cache off的情况的话)。但是用户协程会继承父协程的全局表,其父子关系由OR进行维护。

  1. /* 获取虚拟机 */
  2. vm = ngx_stream_lua_get_lua_vm(r, ctx);
  3. /* 创建新协程 */
  4. co = lua_newthread(vm);
  5. /* 然后创建coctx,设置其co、co_status值 */
  6. coctx = ngx_stream_lua_create_co_ctx;
  7. coctx->co = co;
  8. coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;

此时父协程的栈如下:

  1. /* 当前栈: |entry_func|args|顶| */

接下来将父协程的全局表给新创建的协程:

  1. /* make new coroutine share globals of the parent coroutine.
  2. * NOTE: globals don't have to be separated! */
  3. /* 拷贝父协程的全局表到栈上 */
  4. /* L栈: |entry_func|args|全局表|顶| */
  5. ngx_stream_lua_get_globals_table(L);
  6. /* 将全局表移动到新创建的协程co的栈上 */
  7. /* L栈: |entry_func|args|顶| */
  8. lua_xmove(L, co, 1);
  9. /* 从新协程栈上写入其的全局表 */
  10. ngx_stream_lua_set_globals_table(co);
  11. /* 将新协程从进程虚拟机,移动到父协程中 */
  12. /* L栈: |entry_func|args|新协程|顶| */
  13. lua_xmove(vm, L, 1);
  14. /* 入口函数拷贝到L栈顶 */
  15. /* L栈: |entry_func|args|新协程|entry_func|顶|*/
  16. lua_pushvalue(L, 1);
  17. /* 将入口函数从L移到co栈中 */
  18. /* L栈: |entry_func|args|新协程|顶| */
  19. /* co栈: |entry_func|顶|*/
  20. lua_xmove(L, co, 1);

create_helper函数返回之后,L的栈顶是新协程,co的栈顶是入口函数。

ngx_stream_lua_coroutine_create_helper返回之后,进行uthread的初始化。

此时,父协程L是这样的:

  • 栈顶是新创建的协程
  • 然后是参数和入口函数

在此之前,先在registry表中保存一个该协程的ref。(到现在还没搞明白这个ref是干嘛用的?除了创建线程和删除线程,貌似只有检查线程是否活着的时候会查一下这个ref,只是检查状态用coctx->co_status不是也能做到么?8.12更新,之所以要把线程锚定到注册表上,是为了防止被当成垃圾回收。这也解释了为什么只有线程需要锚定到注册表上,而用户协程不需要。因为用户协程肯定由其父协程保留着一个引用。)

  1. /* anchor the newly created coroutine into the Lua registry */
  2. /* 把新创建的协程写入Lua registry表中 */
  3. /* 将ngx_stream_lua_coroutines_key的地址压入栈中 */
  4. lua_pushlightuserdata(L, &ngx_stream_lua_coroutines_key);
  5. /* 从registry表中获取协程表 */
  6. /* L栈: |entry_func|args|新协程|协程表|顶|*/
  7. lua_rawget(L, LUA_REGISTRYINDEX);
  8. /* 将新协程压栈 */
  9. /* L栈: |entry_func|args|新协程|协程表|新协程|顶|*/
  10. lua_pushvalue(L, -2);
  11. /* -2位置是注册表,为新协程创建在报表中的索引 */
  12. /* L栈: |entry_func|args|新协程|协程表|顶| */
  13. coctx->co_ref = luaL_ref(L, -2); //
  14. /* 弹出协程表 */
  15. /* L栈: |entry_func|args|新协程|顶| */
  16. lua_pop(L, 1);

接下来是初始化运行环境:

此时的,L的栈情况如下:

  1. |entry_func|参数1|...|参数n|新协程|
  2. 1 2 ... -2 -1
  1. if (n > 1) {
  2. /* 由于lua函数压栈顺序是从左到右
  3. * 因此1就是压入的第一个参数,而spawn的第一个参数就是入口函数
  4. * 把栈顶元素(即新协程)移动到1,覆盖入口函数,入口函数前面已经拷贝到新协程栈上了
  5. */
  6. /* L栈: |新协程|args|顶| */
  7. lua_replace(L, 1);
  8. /* 将参数移到新协程栈中 */
  9. /* L栈: |新协程|顶|*/
  10. /* co栈: |入口函数|args|顶| */
  11. lua_xmove(L, coctx->co, n - 1);
  12. }

设置状态,将父协程放入post_thread队列中,设置协程的父子关系,设置新协程为下一个调度的线程

  1. /* 设置状态 */
  2. coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
  3. ctx->co_op = NGX_STREAM_LUA_USER_THREAD_RESUME;
  4. ctx->cur_co_ctx->thread_spawn_yielded = 1;
  5. /* 将父协程放入post_thread队列中 */
  6. ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx)
  7. /* 保存子线程的父协程上下文为当前协程 */
  8. coctx->parent_co_ctx = ctx->cur_co_ctx;
  9. /* 切换当前协程为新创建的协程 */
  10. ctx->cur_co_ctx = coctx;

最后,spawn函数的返回值是新创建的协程

  1. /* 将原协程的执行权切换出去,这里的参数1表示栈上留了一个值,这里是指新创建的协程
  2. * 主线程并不会取这个值,而是等到新线程spawn返回时作为返回值。
  3. * 此时L栈中是新协程,co栈中是参数和入口函数。
  4. */
  5. return lua_yield(L, 1);

OR替换了原生的coroutine接口,当存在getfenv(0).__ngx_req时(全局环境保存了nginx请求),使用重写后的coroutine接口函数。

coroutine.create()创建新协程部分跟uthread是一样的,都是调用ngx_stream_lua_coroutine_create_helper()。Lua函数返回新协程。此时新协程栈中是入口函数。

coroutine.resume()用于开始或恢复新协程,其对应的C函数是ngx_http_lua_coroutine_resume()

  1. /* 首先,获取到协程 */
  2. /* L栈: |co|参数|, co栈: |入口函数| */
  3. co = lua_tothread(L, 1);
  4. /* 然后设置状态和父子关系 */
  5. /* 父协程为normal */
  6. p_coctx->co_status = NGX_HTTP_LUA_CO_NORMAL;
  7. coctx->parent_co_ctx = p_coctx;
  8. dd("set coroutine to running");
  9. /* 子协程为running */
  10. coctx->co_status = NGX_HTTP_LUA_CO_RUNNING;
  11. /* 设置co_op告知主线程yield类型 */
  12. ctx->co_op = NGX_HTTP_LUA_USER_CORO_RESUME;
  13. /* 设置下一个调度协程为新协程 */
  14. ctx->cur_co_ctx = coctx;

接下来,将控制权交还给主协程,并把参数传给主线程。

  1. /* 此时L栈: |co|参数|, co栈: |入口函数| */
  2. /* lua_gettop(L) - 1表示留在栈中的返回值个数,
  3. * 由主线程取用之后,在lua_resume新协程时传递 */
  4. /* 减一个,表示不传底下的co */
  5. return lua_yield(L, lua_gettop(L) - 1);

OR中协程的执行和恢复总是由主线程来进行,不管是coroutine.resume()还是ngx.thread.spawn(),都是先lua_yield()回到主线程之后,在主线程中lua_resume()

注意到前面创建阶段,thread是lua_yield(L, 1),coroutine是lua_yield(L, lua_gettop(L) - 1)。yield到主线程之后,我们继续看调度程序的处理。

先获取参数个数

  1. /* 因为入口函数和参数已经在新线程栈中了,所以从新协程中获取参数个数,-1是除掉入口函数 */
  2. nrets = lua_gettop(ctx->cur_co_ctx->co) - 1;

然后跳到主循环的前面,执行新线程

  1. /* 保存新协程coctx */
  2. orig_coctx = ctx->cur_co_ctx;
  3. /* 执行新线程,其中nrets为参数个数 */
  4. rv = lua_resume(orig_coctx->co, nrets);

lua_resume中就会开始新线程的执行。当新线程执行完毕或因I/O中断yield之后,会恢复父协程。在恢复父协程之前,先设置参数个数为1,即之前留在栈上的新协程co。恢复父协程之后,ngx.thread.spawn()函数就返回了。

  1. if (ctx->cur_co_ctx->thread_spawn_yielded) {
  2. ctx->cur_co_ctx->thread_spawn_yielded = 0;
  3. nrets = 1;
  4. }

同样是先获取参数个数

  1. /* 获取父协程 */
  2. old_co = ctx->cur_co_ctx->parent_co_ctx->co;
  3. /* 因为参数还在父协程栈中,所以从父协程栈中获取参数个数 */
  4. nrets = lua_gettop(old_co);
  5. if (nrets) {
  6. /* 将参数从父协程移到子协程 */
  7. lua_xmove(old_co, ctx->cur_co_ctx->co, nrets);
  8. }

此时子协程栈中是参数和入口函数。

然后跳到主循环的前面,执行新协程,跟前面uthread时一样。

协程的挂起分为两种情况:

  • 一种是内部在I/O等待时自动挂起,这种情况用户不用参与,OR会自动将相应的事件及其handler挂到事件驱动上,当事件被唤醒时继续未完成的I/O操作,完成之后由调度器恢复之前挂起的协程。
  • 另一种是用户在Lua代码主动调用coroutine.yield()挂起。此时由调度器根据情况决定执行下一个执行的协程。

我们先来看用户主动挂起的情况,coroutine.yield()对应的C函数为ngx_stream_lua_coroutine_yield()。我们先来看看它里面干了些什么。

  1. /* 首先修改当前协程的状态为挂起 */
  2. coctx = ctx->cur_co_ctx;
  3. coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;
  4. /* 设置co_op */
  5. ctx->co_op = NGX_STREAM_LUA_USER_CORO_YIELD;
  6. /* 如果不是用户线程(也即是普通coroutine),且有父协程,
  7. 将其父协程状态设置为running */
  8. if (!coctx->is_uthread && coctx->parent_co_ctx) {
  9. coctx->parent_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
  10. }
  11. /* 最后将控制权交还主线程,将所有yield参数传递给主线程 */
  12. return lua_yield(L, lua_gettop(L));

回到主线程之后,根据待挂起协程是thread还是corotine进行不同处理。

  1. if (ngx_stream_lua_is_thread(ctx)) {
  2. /* 丢弃coroutine.yield()的任何参数 */
  3. lua_settop(ctx->cur_co_ctx->co, 0);
  4. /* 因为thread由调度器负责调度,所以将当前线程的状态改为running,为什么不在前面一起改?*/
  5. ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
  6. /* 如果已经有pending的线程,则放到队列中 */
  7. if (ctx->posted_threads) {
  8. ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx);
  9. ctx->cur_co_ctx = NULL;
  10. return NULL;
  11. }
  12. /* 否则,立即恢复线程 */
  13. }
  1. /* 获取当前栈的高度,也即coroutine.yield()的参数个数 */
  2. nrets = lua_gettop(ctx->cur_co_ctx->co);
  3. /* 设置父协程为下一个调度的协程 */
  4. next_coctx = ctx->cur_co_ctx->parent_co_ctx;
  5. next_co = next_coctx->co;
  6. /* 将参数从子协程栈中移到父协程栈中 */
  7. if (nrets) {
  8. dd("moving %d return values to next co", nrets);
  9. lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
  10. #ifdef NGX_LUA_USE_ASSERT
  11. ctx->cur_co_ctx->co_top -= nrets;
  12. #endif
  13. }
  14. /* 如果不是wrap封装的,还要加一个true,作为第一个参数 */
  15. if (!ctx->cur_co_ctx->is_wrap) {
  16. /* prepare return values for coroutine.resume
  17. * (true plus any retvals)
  18. */
  19. lua_pushboolean(next_co, 1);
  20. /* 插入1的位置,作为第一个参数 */
  21. lua_insert(next_co, 1);
  22. nrets++; /* add the true boolean value */
  23. }
  24. ctx->cur_co_ctx = next_coctx;
  25. /* 回到主循环的前面,resume父协程 */
  26. break;

I/O等待的场景有很多,不过其背后的原理都差不多:

  • 定义一个事件,设置恢复时的handler及对应协程上下文,然后lua_yield()回到run_thread()
  • 主线程将ctx->cur_co_ctx设为空之后,直接返回NGX_AGAIN,如果有posted_thread会继续执行,否则将控制权交还给nginx层
  • 后续当事件发生时,继续未完成的操作,完成之后将保存的协程上下文设为ctx->cur_co_ctx,然后调用ngx_stream_lua_run_thread()恢复协程的执行。

这里举两个典型的例子:

它的C函数实现是ngx_stream_lua_ngx_sleep(),先定义设置好handler和coctx,挂上定时器,然后lua_yield()

  1. ngx_stream_lua_cleanup_pending_operation(coctx);
  2. coctx->cleanup = ngx_stream_lua_sleep_cleanup;
  3. coctx->data = r;
  4. /* 保存恢复时的handler和协程上下文 */
  5. coctx->sleep.handler = ngx_stream_lua_sleep_handler;
  6. coctx->sleep.data = coctx;
  7. coctx->sleep.log = r->connection->log;
  8. /* 当delay为0时,放入post_event队列或添加定时器 */
  9. if (delay == 0) {
  10. #ifdef HAVE_POSTED_DELAYED_EVENTS_PATCH
  11. dd("posting 0 sec sleep event to head of delayed queue");
  12. coctx->sleep.delayed = 1;
  13. ngx_post_event(&coctx->sleep, &ngx_posted_delayed_events);
  14. #else
  15. ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "ngx.sleep(0)"
  16. " called without delayed events patch, this will"
  17. " hurt performance");
  18. ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
  19. #endif
  20. } else { /* 添加定时器 */
  21. ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
  22. }
  23. /* 外层函数*/
  24. return lua_yield(L, 0);

run_thread()里将当前协程上下文置为NULL,然后返回NGX_AGAIN

by_chunk()里会先检查有没有在post队列里的线程,如果没有则返回

  1. rc = ngx_stream_lua_run_thread(L, r, ctx, 0);
  2. if (rc == NGX_ERROR || rc >= NGX_OK) {
  3. /* do nothing */
  4. } else if (rc == NGX_AGAIN) {
  5. rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 0);
  6. } else if (rc == NGX_DONE) { /* 这里DONE的情况只有HTTP子请求的时候会出现 */
  7. rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 1);
  8. } else {
  9. rc = NGX_OK;
  10. }

当定时器超时时,它会执行sleep_handler(),设置ctx->cur_co_ctx然后执行run_thread()恢复协程调度。

其对应的C函数实现是ngx_stream_lua_socket_tcp_receive(),里面会调ngx_stream_lua_socket_tcp_receive_helper()。碰到读等待的情况,也是先设置好handler和coctx,然后lua_yield()。我们来看下里面代码:

  1. /* 这里0表示还未进行协程切换 */
  2. u->read_waiting = 0;
  3. u->read_co_ctx = NULL;
  4. /* 读取的主要逻辑由此函数处理 */
  5. rc = ngx_stream_lua_socket_tcp_read(r, u);
  6. /* 不管是成功、出错或等待I/O,肯定会返回 */
  7. if(rc == NGX_ERROR) {
  8. /*...*/
  9. }
  10. if(rc == NGX_OK) {
  11. /*...*/
  12. }
  13. /* rc == NGX_AGAIN */
  14. /* 如果是等待I/O的情况,设置事件触发时的handler、当前协程上下文 */
  15. u->read_event_handler = ngx_stream_lua_socket_read_handler;
  16. coctx = lctx->cur_co_ctx;
  17. /* 设置请求的写事件handler,这个是返回到Lua层前调用的handler */
  18. r->write_event_handler = ngx_stream_lua_content_wev_handler;
  19. /* 保存当前协程上下文到u上 */
  20. u->read_co_ctx = coctx;
  21. /* 表示是后续是需要协程恢复的 */
  22. u->read_waiting = 1;
  23. /* 设置准备返回值的回调 */
  24. u->read_prepare_retvals = ngx_stream_lua_socket_tcp_receive_retval_handler;
  25. return lua_yield(L, 0);

回到run_thread(),同样是将当前协程上下文置为NULL,然后返回NGX_AGAIN

当事件被触发时,执行前面设置的ngx_stream_lua_socket_read_handler(),里面又会调用读取操作核心函数ngx_stream_lua_socket_tcp_read()。如果继续碰到等待I/O,handler直接结束,等待下一次事件。如果是完成或出错,会执行如下操作:

  1. /* 恢复该值为0 */
  2. u->read_waiting = 0;
  3. /* 获取协程上下文 */
  4. coctx = u->read_co_ctx;
  5. /* 设置协程恢复的handler */
  6. ctx->resume_handler = ngx_stream_lua_socket_tcp_read_resume;
  7. /* 设置下一个调度的上下文,为之前调用读取操作的协程 */
  8. ctx->cur_co_ctx = coctx;
  9. /* 这个handler就是yield之前设置的那个,它里面调用 ctx->resume_handler */
  10. r->write_event_handler(r);

r->write_event_handler(r);是返回Lua层前调用的handler,里面会调用resume_handlerngx_stream_lua_socket_tcp_read_resume()只是封装了一下,最终都是调用的ngx_stream_lua_socket_tcp_resume_helper(),我们看来下它的代码:

  1. /* 待恢复协程上下文 */
  2. coctx = ctx->cur_co_ctx;
  3. u = coctx->data;
  4. prepare_retvals = u->read_prepare_retvals;
  5. /* 准备返回值 */
  6. nret = prepare_retvals(r, u, ctx->cur_co_ctx->co);
  7. /* 恢复协程调度,回到Lua层 */
  8. rc = ngx_stream_lua_run_thread(vm, r, ctx, nret);

至于完成的条件,取决与不同的调用方式。如果是读取固定字节数的话,会维护一个剩余待读取的字节数u->rest。如果是读取一行,则读取到\n就结束。如果是readall,则一直读到u->eof为止。

为了不失完整性,再说一下正常结束和出错时的情况。正常执行完毕时,会设置协程状态,然后清理它的僵尸子线程:

  1. /* 将当前协程状态置为DEAD */
  2. ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_DEAD;
  3. /* 如果子线程有僵尸线程,则清理之 */
  4. if (ctx->cur_co_ctx->zombie_child_threads) {
  5. ngx_stream_lua_cleanup_zombie_child_uthreads(
  6. r, L, ctx, ctx->cur_co_ctx);
  7. }

接下来,根据结束的协程的类型不同执行不同的操作:

此时直接删除线程即可,然后根据是否还有用户线程,选择返回NGX_AGAINNGX_OK

  1. if (ngx_stream_lua_is_entry_thread(ctx)) {
  2. /* 将虚拟机栈清空 */
  3. lua_settop(L, 0);
  4. /* 删除当前线程,会从REGISTY表中解引用当前协程的`coctx->co_ref` */
  5. ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
  6. /* 如果还有其他用户线程,返回NGX_AGAIN */
  7. if (ctx->uthreads) {
  8. ctx->cur_co_ctx = NULL;
  9. return NGX_AGAIN;
  10. }
  11. /* all user threads terminated already */
  12. goto done; /* 到这就圆满结束了 return NGX_OK; */
  13. }

此时如果父协程已经死了,处理方式跟入口线程一样,即删除线程,然后根据是否还有任何用户线程或入口线程,选择返回NGX_AGAINNGX_OK

如果父协程还活着,并且已经在wait它了,直接恢复父协程。否则,加入到父协程的僵尸线程列表中。

  1. if (ctx->cur_co_ctx->is_uthread) {
  2. /* 清空虚拟机栈 */
  3. lua_settop(L, 0);
  4. /* 获取父协程 */
  5. parent_coctx = ctx->cur_co_ctx->parent_co_ctx;
  6. /* 如果父协程还活着 */
  7. if (ngx_stream_lua_coroutine_alive(parent_coctx)) {
  8. /* 并且在wait当前线程,则恢复父协程 */
  9. if (ctx->cur_co_ctx->waited_by_parent) {
  10. ngx_stream_lua_probe_info("parent already waiting");
  11. ctx->cur_co_ctx->waited_by_parent = 0;
  12. success = 1;
  13. goto user_co_done;
  14. }
  15. /* 否则将当前线程挂到父协程的僵尸子线程中 */
  16. if (ngx_stream_lua_post_zombie_thread(r, parent_coctx,
  17. ctx->cur_co_ctx)
  18. != NGX_OK)
  19. {
  20. return NGX_ERROR;
  21. }
  22. /* 压入第一个返回值true,以备后续wait时返回 */
  23. lua_pushboolean(ctx->cur_co_ctx->co, 1);
  24. lua_insert(ctx->cur_co_ctx->co, 1);
  25. /* 设置当前线程状态为ZOMBIE */
  26. ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_ZOMBIE;
  27. ctx->cur_co_ctx = NULL;
  28. return NGX_AGAIN; /* 返回上层 */
  29. }
  30. /* 如果父协程已经死了,直接删除当前线程
  31. * 会从REGISTY表中解引用当前协程的`coctx->co_ref` */
  32. ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
  33. ctx->uthreads--;
  34. /* 如果没有用户线程了 */
  35. if (ctx->uthreads == 0) {
  36. /* 入口线程在活着,返回上层 */
  37. if (ngx_stream_lua_entry_thread_alive(ctx)) {
  38. ctx->cur_co_ctx = NULL;
  39. return NGX_AGAIN;
  40. }
  41. /* all threads terminated already */
  42. goto done; /* 到这就圆满结束了 return NGX_OK; */
  43. }
  44. /* 如果还有其他用户线程,返回上层 */
  45. ctx->cur_co_ctx = NULL;
  46. return NGX_AGAIN;
  47. }

剩下的就是用户协程的情况,这个情况跟用户线程被父协程wait的情况是一样的。主要是将返回值移动到父协程栈中,然后跳到主循环前面恢复父协程的执行。

  1. success = 1;
  2. /* 获取返回值个数 */
  3. nrets = lua_gettop(ctx->cur_co_ctx->co);
  4. next_coctx = ctx->cur_co_ctx->parent_co_ctx;
  5. next_co = next_coctx->co;
  6. /* 将返回值移到父协程栈中 */
  7. if (nrets) {
  8. lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
  9. }
  10. /* 如果是用户线程,删除之 */
  11. if (ctx->cur_co_ctx->is_uthread) {
  12. ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
  13. ctx->uthreads--;
  14. }
  15. /* 除了wrap的用户协程,加上第一个true的返回值 */
  16. if (!ctx->cur_co_ctx->is_wrap) {
  17. /* ended successfully, coroutine.resume returns true plus
  18. * any return values
  19. */
  20. lua_pushboolean(next_co, success);
  21. lua_insert(next_co, 1);
  22. nrets++;
  23. }
  24. /* 设置父协程的状态为RUNNING */
  25. ctx->cur_co_ctx = next_coctx;
  26. next_coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
  27. /* 回到主循环前面,恢复父协程的执行 */
  28. continue;

大致处理步骤是,恢复cur_co_ctx,获取虚拟机L栈上错误信息,获取当前协程栈中错误信息,后面的操作类似协程执行完毕时,根据不同的情况选择恢复父协程或者返回上层。

  1. /* 恢复cur_co_ctx */
  2. if (ctx->cur_co_ctx != orig_coctx) {
  3. ctx->cur_co_ctx = orig_coctx;
  4. }
  5. /* 设置当前协程状态为DEAD */
  6. ctx->cur_co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD;
  7. /* 获取错误信息 */
  8. if (orig_coctx->is_uthread
  9. || orig_coctx->is_wrap
  10. || ngx_http_lua_is_entry_thread(ctx))
  11. {
  12. ngx_http_lua_thread_traceback(L, orig_coctx->co, orig_coctx);
  13. trace = lua_tostring(L, -1);
  14. if (lua_isstring(orig_coctx->co, -1)) {
  15. msg = lua_tostring(orig_coctx->co, -1);
  16. dd("user custom error msg: %s", msg);
  17. } else {
  18. msg = "unknown reason";
  19. }
  20. }

跟正常结束的处理一样,除了第一个返回值是false。

此时如果父协程已经死了,直接删除线程,然后根据是否还有任何用户线程或入口线程,选择返回NGX_AGAINNGX_OK

如果父协程还活着,并且已经在wait它了,直接恢复父协程。否则,加入到父协程的僵尸线程列表中。

ngx_stream_lua_request_cleanup()清理当前请求,里面会清理掉所有的用户创建的协程,然后清理入口协程自己。最后返回错误码。

如果是wrap的协程,将错误传递给父协程(就好像是父协程出错了,然后父协程重新走一遍上面的出错处理流程)。

如果是普通协程,则恢复父协程的执行,返回false和错误信息。

本博客已经迁移至CatBro’s Blog,那里是我自己搭建的个人博客,页面效果比这边更好,支持站内搜索,评论回复还支持邮件提醒,欢迎关注。这边只会在有时间的时候不定期搬运一下。

本篇文章链接

版权声明:本文为logchen原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/logchen/p/15145525.html