skynet源码分析(十四)协程的使用

  skynet 中有不少关键功能都是依赖 Lua 协程实现的,skynet 本身也对 Lua 协程有一些管理。本篇会尝试分析 skynet 中对 Lua 协程的使用方法。

协程池

  一般基于 Lua 的框架都会提供协程的复用,复用可以减少不断创建和销毁协程的开销。协程池是大部分框架的选择,在 skynet 中也是通过协程池实现的协程复用。

1
local coroutine_pool = setmetatable({}, { __mode = "kv" })

  协程池的结构是一个 table,被当作队列来使用,需要协程时通过 table.remove 从队首取出,用完以后放在队尾。协程池一开始是空的, Skynet 并不会在初始化时创建一些协程备用。

创建协程

  在 skynet 中创建协程的接口是 co_create(func),它会首先尝试从协程池中拿取一个协程,如果拿不到,会自己创建一个协程出来。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
local function co_create(f)
    local co = tremove(coroutine_pool)
    if co == nil then
        co = coroutine_create(function(...)
            f(...)
            while true do
                local session = session_coroutine_id[co] -- 拿 co 对应的 session
                if session and session ~= 0 then
                    local source = debug.getinfo(f, "S")
                    skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
                            session,
                            skynet.address(session_coroutine_address[co]),
                            source.source, source.linedefined))
                end
                -- coroutine exit
                local tag = session_coroutine_tracetag[co]
                if tag ~= nil then
                    if tag then
                        c.trace(tag, "end")
                    end
                    session_coroutine_tracetag[co] = nil
                end
                local address = session_coroutine_address[co]
                if address then
                    session_coroutine_id[co] = nil
                    session_coroutine_address[co] = nil
                end

                -- recycle co into pool
                f = nil
                coroutine_pool[#coroutine_pool + 1] = co
                -- recv new main function f
                f = coroutine_yield "SUSPEND" -- 拿到新传进来的 f
                f(coroutine_yield()) -- 马上再 yield, 等待第二次 resume 才会真正执行
            end
        end)
    else
        -- pass the main function f to coroutine, and restore running thread
        local running = running_thread
        coroutine_resume(co, f) -- 这里的 resume 并不会真的执行,只是传递 f 进到协程里
        running_thread = running
    end
    return co
end

  因为协程在用完以后要放回协程池中,所以在创建的时候包裹中的函数除了要执行参数 func 以外,还需要一个无限循环。循环中可以接受新的函数,并且再次执行。
  skynet 通过使用 yield 和 resume 配合,实现了给一个创建好的协程传入要执行的函数。在协程的循环中,协程 yield 了两次,第一次 #33 行的 yield 等待目标函数的传入,第二次在 #34 行对 yield 的调用是等待第一次传入的函数的参数。co_create 的调用会把目标函数当作参数对协程调用 resume 完成第一步,并且返回协程,等待第二次调用。

协程与消息

  协程在 Skynet 中的大部分使用场景都是用来处理消息的,Skyent 中使用了几个 table 用来记录协程和消息的关系。

1
2
3
4
local session_id_coroutine = {} -- session 到 协程的映射
local session_coroutine_id = {} -- 协程到 session 的映射
local session_coroutine_address = {} -- 协程到消息的源服务地址的映射
local session_coroutine_tracetag = {} -- 记录协程的追踪标记

  在处理接收到的消息时,会根据消息类型获取其 dispatch 函数,并且为 dispatch 函数创建一个协程。创建以后会在 session_coroutine_id 和 session_coroutine_address 增加数据。当协程被 resume 执行完消息处理函数之后,协程的循环中会删除掉 session_coroutine_id 和 session_coroutine_address 的记录,并且把协程重新投入协程池中去。
  在发送消息时,如果是通过 skynet.send 发消息的话,不需要做记录,如果是通过 skynet.call 发消息的话,需要向 session_id_coroutine 中新增记录,保存 session 到协程的映射。有了这个映射关系,在处理回复的消息的时候,才能根据 session 拿到在等待它返回的协程,然后清除掉 session_id_coroutine 中的记录,并且把返回的消息交给等待的协程处理。
  session_coroutine_id 还有一个作用,可以判断是不是忘记回复消息的源服务了。如果在处理消息的时候使用 skynet.ret 回复了源服务的话,则会清掉 session_coroutine_id 中的数据。在协程的循环中,当消息的处理函数调用结束以后,会检查 session_coroutine_id 中本协程对应的 session,如果还没有清掉并且不为 0 的话,则说明这是一条需要回复但是并没有进行回复的消息。

fork

  skynet.fork 可以创建一个协程来执行一个函数,一般用在逻辑中需要调用阻塞接口,但是又不想阻塞在这里的情况,比如需要连续多次的 call 调用的情况就可以为每个 call 调用创建一个协程。另外在通过循环 sleep 实现连续的定时调用时也可以用 fork 来包裹需要不断调用的函数。
  它的实现就是用 co_create 创建了一个协程,包裹住目标函数。然后将其投入到了一个叫做 fork_queue 的队列中去,等待被调用。
  fork_queue 的调用时机其实很快,就在处理完本条消息以后,skynet.dispatch_message 就会为 fork_queue 中所有等待的协程执行 resume 来处理它。
  云风说可以把 skynet.fork 当作是更加高效的 skynet.timeout(0) 来看待,通过实现也可以看出,fork 确实比 timeout 要省略了很多步骤。

睡眠和唤醒

  skynet.wait 和 skynet.sleep 都可以让出当前协程,都是需要传入一个 token 来标识挂起的协程。区别是 skynet.sleep 需要指定一个唤醒的定时,如果在到期之前还没有被 skynet.wake 唤醒的话,则会由 timer 线程发送消息来唤醒。skynet.yield 也可以让出当前协程,不过它只是对 skynet.sleep(0) 的简单封装,这里不再赘述。
  wait 和 sleep 的实现没太大区别,最主要的区别是 session 的获取。wait 是直接调用 session 分配接口获得的,sleep 则是注册定时事件,通过定时事件返回 session 获得的。它们都通过同一个接口 suspend_sleep 将自己阻塞,并且把 session 和协程的映射关系保存在了 session_id_coroutine 中,把 token 和 session 的映射关系保存在了 sleep_session 中。
  skynet.wakeup 可以唤醒这两种睡眠的协程,包括还未到期的通过 sleep 接口睡眠的协程。但是逻辑并不在这个函数中,它负责的只是把要唤醒的协程的 token 插入到了 wakeup_queue 中。
  wakeup_queue 中等待唤醒的协程会在消息的处理协程挂起以后,通过 dispatch_wakeup 根据 wakeup 的调用顺序被依次唤醒。

Licensed under CC BY-NC-SA 4.0
Built with Hugo
主题 StackJimmy 设计