skynet源码分析(八)一切都是消息

  skynet 使用的并发模型是 actor 模型,这就使得两个 actor 之间进行交互的时候全都是通过消息来实现的,而且不单单是 actor 之间交互使用了消息,前文提到的定时调用和网络事件也都是通过消息来通知目标服务的。可以说 skynet 中消息是一切交互的依托形式,本篇会尝试理清楚消息相关的实现。

消息的结构

1
2
3
4
5
6
struct skynet_message {
    uint32_t source; // 消息源的地址
    int session; // 消息的 session id
    void *data; // 消息的数据
    size_t sz; // 消息的长度和类型,高 8bit 为类型,其余为长度
};

  消息的结构非常简单,其中 session 由源服务生成,是给 call 调用提供支持的,如果是 send 消息的话则 session 会被设为 0。

发送消息

  不管是从 Lua 层还是 C 层发送消息,最终调用的接口都是 skynet_send 函数。

1
2
3
4
5
6
7
int skynet_send(struct skynet_context *context, 
                uint32_t source,
                uint32_t destination,
                int type,
                int session,
                void *data,
                size_t sz)

  可以看到这是一个参数非常多的函数,但是仔细一看就会发现参数的内容跟上面消息的结构基本上吻合。事实上这个函数做的事情也简单,用参数生成一个 struct skynet_message 结构的消息,然后 push 到目标服务的消息队列中去即可。

服务消息队列

  skynet 中的每个服务都有一个 struct message_queue 结构的消息队列,它对应了 actor 模型中的 mailbox,保存着本服务的全部待处理消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct message_queue {
    struct spinlock lock; // 自旋锁
    uint32_t handle; // 消息队列所属服务的 handle
    int cap; // 容量,动态长度,可以扩容
    int head; // 消息的头指针
    int tail; // 消息的尾指针
    int release; // 标记是否已经被释放
    int in_global; // 标记是否在全局队列中
    int overload; // 现在的负载
    int overload_threshold; // 超载警告的阈值
    struct skynet_message *queue; // 消息队列数组
    struct message_queue *next; // 下一个消息队列,链表结构
};

  从成员变量中可以看到消息队列的实现方式。首先它是自带有一个自旋锁的,因为很多线程都有可能同时修改某个服务的消息队列,所以修改的时候都需要加锁。它通过 head,tail,cap 和 queue 组成了一个环形动态长度的数组,初始的容量为 64,当容量不够用时,每次扩容到以前容量的 2 倍。此外消息队列通过 overload 和 overload_threshold 实现了一个负载警告的功能,当拿取消息的时候,如果当前消息数量超过了当前设置的负载阈值,会输出一条服务负载过重的警告信息。

全局消息队列

1
2
3
4
5
struct global_queue {
    struct message_queue *head;
    struct message_queue *tail;
    struct spinlock lock;
};

  skynet 中有一个全局的消息队列,当服务的消息队列中有待处理的消息时,它会被放在全局消息队列中来。worker 线程每次就是先从全局消息队列中拿一个服务队列出去,然后处理这个服务消息队列中的消息。
  struct global_queue 的结构比较简单,是一个 message_queue 的单向链表。带有一个自旋锁,修改链表要先加锁。

消息处理

注册回调函数

  每个服务都需要有一个回调函数,回调函数被用来处理消息的数据。通过调用 skynet_callback 来注册回调函数。

1
void skynet_callback(struct skynet_context *context, void *ud, skynet_cb cb)

  skynet_cb 是一个如下形式的函数指针。

1
2
3
4
5
6
7
typedef int (*skynet_cb)(struct skynet_context * context, 
                        void *ud, 
                        int type,
                        int session,
                        uint32_t source,
                        const void * msg,
                        size_t sz);

  不同的服务有不同的回调函数设置的位置,例如最常用的 snlua 服务会在 lua 脚本中调用 skynet.start 函数,这个函数里面就会把 lua_skynet.c 中的 _cb 设为回调函数。

数据打包与解包

  消息在发送之前有时需要对数据进行打包,比如说要发送的数据是一个 lua table 这种。如果消息的数据有进行过打包的话,那么目标服务在处理消息的时候就需要进行对应的解包操作。
  这就要求在消息的发送方和接收方两边对同一个类型的协议有相同的配置,起码 pack 和 unpack 这两个函数一定要可以实现互相转换。一个最常见的例子,在 snlua 服务中,“lua” 类型的协议的打包和解包函数分别为 skynet.pack 和 skynet.unpack,前者可以把 lua table 转为二进制数据,后者可以把前者转换的结果再次转为 lua table 格式。

执行消息回调

  执行回调本身只负责调用注册的回调函数,需要各个服务在回调函数中正确处理自己的业务。回调可以很简单很直接,也可以很复杂。比如在 snlua 服务的回调函数就比较复杂,当消息的回调被调用时,首先调用的是直接注册的 _cb,它会从索引表中取到注册时存进去的 lua 回调函数也就是 skynet.dispatch_message 并且执行它,执行中会根据协议的类型,在 proto 中找到对应的协议类型的参数,从中拿到之前注册过的消息分发函数 dispatch,用它创建一个协程,然后执行协程。

Lua 层接口

初始化服务

  skynet.start 是绝大部分 snlua 服务会在服务的入口文件里调用的方法。它通过 C 接口,把消息回调函数设为了 _cb,_cb 相当于是一个 wrapper 函数,它会把通过参数传入的 Lua 函数 skynet.dispatch_message 保存在 LUA_REGISTRYINDEX 中,等到 _cb 被调用的时候,它会取出这个函数,把参数传给它,并且调用它。也就是说基本上所有的 snlua 服务都是用的同一个回调函数 skynet.dispatch_message 来处理消息的。
  传给 skynet.start 的函数可以看作是初始化函数,一般会在里面调用 skynet.dispatch 来为关心的消息类型指定处理函数。

注册消息协议

  可以通过 skynet.register_protocol 来注册一类消息,其中 lua, response 和 error 是在 require skynet 的时候就已经注册过的了,不再需要重复注册。
  可以通过调用 skynet.dispatch 来修改某个消息类型的处理函数。在绝大多数 snlua 服务的初始脚本中,都包含对 skynet.dispatch 的调用,比如下面这个就是在注册 “lua” 协议的消息的回调函数。

1
2
3
skynet.dispatch("lua", function(...)
    -- ...
end)

  snlua 服务中会维护一个 proto 的 table 用来保存各类协议的数据,其中的数据正是由上文提到的 skynet.register_protocol 注册进来的。一个典型的协议会包含以下几种数据

1
2
3
4
5
6
7
8
{
    name = "lua",
    id = skynet.PTYPE_LUA,
    pack = skynet.pack,
    unpack = skynet.unpack,
    dispatch = function() end,
    trace = nil
}

发送消息

  skynet.send 是最常用的消息发送方法,有目标服务的句柄或是名字都可以发送消息。它会根据消息的类型,找到类型对应的打包函数,将其打包,调用 C 层的接口发送出去。
  skynet.call 可以理解为类似远程调用的那种感觉,执行以后会挂起本协程,等待目标服务对本消息进行返回以后,会再次从挂起点开始执行。call 的实现深度依赖了 Lua 的协程,发送消息的部分跟 send 是一样的,只是它会拿到 send 返回的 session,同时将 session 和当前运行的协程绑定,然后调用 yield 阻塞当前协程,等待唤醒。

处理消息

  在 skynet.dispatch_message 中可以看到,处理消息是使用 pcall 调用 raw_dispatch_message 来实现的。除了处理消息外,本函数还处理了 fork 相关的调用。
  raw_dispatch_message 中,如果消息的类型为 PTYPE_RESPONSE 也就是对 call 调用的回复,则会通过消息的 session 找到之前调用 call 挂起自己的协程,然后把结果通过 resume 传给协程,使其从挂起点之后继续执行。
  如果是其它消息类型,则通过类型拿到对应的处理函数,拿到一个新的协程,通过类型指定的解包方法解包数据,然后用协程执行处理函数和消息数据。

Licensed under CC BY-NC-SA 4.0
Built with Hugo
主题 StackJimmy 设计