skynet源码分析(十)网络分析

  网络相关的部分是我感觉 skynet 中最复杂的部分了,本篇中会尝试尽量完整的分析到网络相关的大部分功能的实现原理。

线程模型

  skynet 使用的线程模型是多线程 reactor 模型,有一条 socket 线程用来接收 epoll 事件,并且进行事件分发,有多条 worker 线程来执行事件。
  与其它使用类似模型的框架相比,skynet 最大的区别应该就是还使用了 Actor 的并发模型。socket 线程在处理 epoll 事件的时候,并不是直接把事件交给了 worker 线程来执行,而是把事件和相关的数据一起转化为一条 Actor 之间通用的消息,放到了目标 Actor 的消息队列中,这样等到 worker 线程处理 Actor 的消息队列时,自然会处理这个网络事件。

socket 管理器

结构分析

  每个 skynet 进程都有一个全局的 socket 管理器,它管理了整个进程中的所有 socket 连接,下面来看一下它内部的结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#define MAX_INFO 128
// MAX_SOCKET will be 2^MAX_SOCKET_P
#define MAX_SOCKET_P 16
#define MAX_SOCKET (1<<MAX_SOCKET_P)
#define MAX_EVENT 64
#define MAX_UDP_PACKAGE 65535

struct socket_server {
    volatile uint64_t time; // 时间,由 timer 线程更新,socket 线程直接读这个值
    int recvctrl_fd; // 接收命令的管道套接字
    int sendctrl_fd; // 发送命令的管道套接字
    int checkctrl; // 用来标记是否要检查控制台命令的标志
    poll_fd event_fd; // 全局的 epoll 套接字
    ATOM_INT alloc_id; // 已分配的id,不断累加
    int event_n; // 本次调用 epoll 得到的就绪的 fd 个数
    int event_index; // 目前处理到的 fd 序号
    struct socket_object_interface soi; // userobject 接口
    struct event ev[MAX_EVENT]; // 捕获的事件数组
    struct socket slot[MAX_SOCKET]; // 全部的 socket 对象
    char buffer[MAX_INFO]; // 临时缓冲区
    uint8_t udpbuffer[MAX_UDP_PACKAGE]; // udp 数据缓冲区
    fd_set rfds; // 要监听的读描述符集合,用于命令的 select
};

  各个变量的作用已经在注释中写出,这些变量具体的使用会在后面的实现分析中看到,大概由 epoll 相关的变量,socket 相关的变量,pipe fd 相关的变量组成。

结构初始化

  当服务器进程启动的时候,在之前文章提到过的启动函数 skynet_start 会被调用,它又会调用 socket_server_create 来创建一个全局的 socket_server 对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
struct socket_server *socket_server_create(uint64_t time) {
    int i;

    // 创建用于命令的管道套接字
    int fd[2];

    // 创建 epoll 套接字
    poll_fd efd = sp_create();

    // 检查创建是否成功
    if (sp_invalid(efd)) {
        skynet_error(NULL, "socket-server: create event pool failed.");
        return NULL;
    }

    // 创建管道,这里并未将管道的套接字设为非阻塞
    if (pipe(fd)) {
        sp_release(efd);
        skynet_error(NULL, "socket-server: create socket pair failed.");
        return NULL;
    }

    // 把 fd[0] 加入到 epoll 的管理中
    if (sp_add(efd, fd[0], NULL)) {
        // add recvctrl_fd to event poll
        skynet_error(NULL, "socket-server: can't add server fd to event pool.");
        close(fd[0]);
        close(fd[1]);
        sp_release(efd);
        return NULL;
    }

    // 创建结构体并进行一些初始化
    struct socket_server *ss = MALLOC(sizeof(*ss));
    ss->time = time;
    ss->event_fd = efd;
    ss->recvctrl_fd = fd[0];
    ss->sendctrl_fd = fd[1];
    ss->checkctrl = 1;

    // 填充 socket 管理器的套接字插槽
    for (i = 0; i < MAX_SOCKET; i++) {
        struct socket *s = &ss->slot[i];
        // 初始化连接状态
        ATOM_INIT(&s->type, SOCKET_TYPE_INVALID);
        // 清空高优先级缓冲区
        clear_wb_list(&s->high);
        // 清空低优先级缓冲区
        clear_wb_list(&s->low);
        // 初始化锁
        spinlock_init(&s->dw_lock);
    }
    ATOM_INIT(&ss->alloc_id, 0);
    ss->event_n = 0;
    ss->event_index = 0;
    memset(&ss->soi, 0, sizeof(ss->soi));
    // 清空监听描述符集
    FD_ZERO(&ss->rfds);
    assert(ss->recvctrl_fd < FD_SETSIZE);

    return ss;
}

  虽然函数比较长,但是做的事情还是很简单的,基本就是创建了一个 socket_server 类型的变量,然后初始化它,最后返回,比较重要的几个步骤如下。

  • 创建了用于 ctrl 命令的管道套接字 fd[0] 和 fd[1]
  • 创建了 epoll 套接字
  • 将管道的接收端加入 epoll 套接字的管理中
  • 初始化 slot 数组中的全部数据

  稍微需要注意的地方是 slot 数组,它的总长度是 MAX_SOCKET 也就是 65535,它的长度是固定的,在一开始被初始化,也不会扩容,这意味着单个 skynet 进程支持的连接上限是 65535 个,绝大部分情况下是够用了。

epoll 相关

  skynet 只提供了两个平台的兼容性,Linux 和 MacOS,为了兼容这两个平台,所以在 IO 复用的系统接口这边肯定会做一次上层的封装。照例是在 Linux 下使用 epoll,在 MacOS 下使用 kqueue,本文只关注 epoll 的部分,因为我不会 kqueue,233。
  所有关于 IO 复用的接口都在文件 socket_poll.h 中封装,用的是我非常不喜欢的先定义接口,然后再根据宏选择 include 哪个头文件的方法实现,这种方法对所有代码提示器都是灾难。

1
2
3
4
5
6
7
8
static bool sp_invalid(poll_fd fd);
static poll_fd sp_create();
static void sp_release(poll_fd fd);
static int sp_add(poll_fd fd, int sock, void *ud);
static void sp_del(poll_fd fd, int sock);
static int sp_enable(poll_fd, int sock, void *ud, bool read_enable, bool write_enable);
static int sp_wait(poll_fd, struct event *e, int max);
static void sp_nonblocking(int sock);

  上面的这些就是 skynet 提供的 IO 复用的所有接口了,具体实现就不展开聊了,都是对 epoll 的接口进行的简单封装而已。
  顺带提一句 skynet 使用 epoll 的模式是水平触发,这一点跟大部分其它框架的实现都一致。

self-pipe

  从 socket_server 的初始化函数 socket_server_create 中可以看到,recvctrl_fdsendctrl_fd 分别是管道 pipe 套接字的两端,并且 recvctrl_fd 被加入到了 epoll 监听中。
  这用到了一个叫做 self-pipe 的技术,《Linux 系统编程手册》65.5.2 有介绍这个技术。它在 skynet 中的应用主要是为了解决单网络线程阻塞在 epoll 的 wait 上这种情况。
  当 socket 线程阻塞在 epoll_wait 上时,如果需要修改 epoll 的监听事件列表,比如说想要使用 epoll_ctl 增加一个新的监听事件,那么大体上有三种方法。

  1. 在调用 epoll_wait 时设置一个超时时间,这样做的话,可以让 socket 线程每过一段时间从 epoll_wait 中解放出来,来进行需要的操作,然后再次调用 epoll_wait 陷入阻塞中。
  2. 使用 worker 线程直接调用 epoll_ctl 对监听的事件集进行修改,是的,epoll 的监听事件集是线程安全的,可以一个线程阻塞在 epoll_wait 上,另一个线程修改事件集。
  3. 将一个专门用于接收事件修改命令的套接字加入到 epoll 的监听中,当需要进行操作时,向套接字中写入命令,这样接收端会变得可读,然后 socket 线程会从 epoll_wait 中被唤醒。

  首先第一种基本可以直接被 pass 了,因为它注定会有额外的 CPU 消耗,很难确定一个合适的超时时间。然后第二种看起来应该是最方便的,但是不知道为什么都没什么人用,貌似是因为早期版本的内核对这个支持的不好吧。第三种就是最常用的了,也是 skynet 的做法,这里用于接收命令的套接字很多框架都会使用管道来实现,简单一些。

socket 结构体

结构分析

  socket 是 skynet 中为单个连接定义的结构体,它全部在上面提到的 slot 数组中,中途不会创建,会一直复用已经创建好的对象。来看下它的结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct socket {
    uintptr_t opaque; // 本结构关联的服务 handle
    struct wb_list high; // 高优先级队列
    struct wb_list low; // 低优先级队列
    int64_t wb_size; // 等待写入的字节长度
    struct socket_stat stat; // 连接状态数据
    ATOM_ULONG sending; // 是否正在发送数据,是一个引用计数,会累加
    int fd; // 套接字
    int id; // 分配的 id
    ATOM_INT type; // 当前连接状态
    uint8_t protocol; // 连接协议
    bool reading; // fd 的 read 监听标记
    bool writing; // fd 的 write 监听标记
    bool closing; // fd 的 close 标记
    ATOM_INT udpconnecting; // udp 正在连接
    int64_t warn_size; // 报警阈值
    union {
        int size; // tcp 连接用 size 表示每次读取的字节数
        uint8_t udp_address[UDP_ADDRESS_SIZE]; // udp 用 udp_address 表示地址
    } p;
    struct spinlock dw_lock; // 自旋锁
    int dw_offset; // 已经写入的大小
    const void *dw_buffer; // dw 待发送的数据缓存,优先级很高
    size_t dw_size; // dw 写出的总大小
};

  socket 连接是个很复杂的结构,而且因为要照顾内存对齐的原因,变量定义的先后顺序排列并不是按照关联性分布的。变量的大概作用已经在注释中写出来了,这里还是先不急解释其中的用法,后面会逐步看到这些变量的用法。
  目前需要关注的是,每个 socket 结构体有一个自己的 id,这个 id 就是 slot 中的索引,另外 opaque 保存了持有该连接的服务的 handle,当连接上有网络消息时,socket 线程会给到这个服务去处理。其它大多是一些网络状态的字段,后面会逐渐看到。

为什么需要 id

  可以看到在 socket 中除了 fd 还有一个 id,那么能不能直接使用 fd 作为socket 的唯一标识呢?只从 fd 的分配范围上来看的话,是没什么问题的,但是因为 fd 的分配策略,内核可能会复用 fd,这就导致了冲突发生的可能性,skynet 分配的 id 会不断累加,在非极端情况下都不会有重复的风险,作为 socket 的唯一标识是更加合适的。
  skynet 在分配 socket 的 ID 时,也会碰到空洞位置的问题,因为关闭的 socket 连接会被再次设为可用状态,这就导致了分配 ID 不能简单累加,而是从 alloc_id 开始,遍历完整个 slot 数组,计算哈希的时候直接针对 MAX_SOCKET 取模即可。这里有个小问题,alloc_id 是个原子变量,可能会让这个分配 ID 的函数的效率雪上加霜,最坏情况下在分配一次 ID 的过程中,alloc_id 要被 atomic_fetch_add 累加几万次。

socket 线程

thread_socket

  socket 线程启动的主函数为 thread_socket,下面是它的代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
static void *thread_socket(void *p) {
    struct monitor *m = p;
    skynet_initthread(THREAD_SOCKET); // 设置线程类型
    for (;;) {
        int r = skynet_socket_poll();
        if (r == 0)
            break; // 退出网络轮询
        if (r < 0) {
            CHECK_ABORT
            continue; // 一般是还有消息没处理完,直接继续循环
        }
        wakeup(m, 0); // 只有当全部 worker 线程都睡眠的时候才会唤醒一个
    }
    return NULL;
}

  这个函数做的事情很简单,不断调用 skynet_socket_poll,根据它的返回值来进行操作,当返回值大于 0 时,会调用 wakeup 尝试唤醒 worker 线程来处理工作。

skynet_socket_poll

  函数 skynet_socket_poll 也就是 socket 线程的主循环了,下面来看一看它的内容。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int skynet_socket_poll() {
    struct socket_server *ss = SOCKET_SERVER;
    assert(ss);
    struct socket_message result;
    int more = 1; // 还有剩余事件没处理完的标记
    int type = socket_server_poll(ss, &result, &more); // 处理 epoll 事件
    switch (type) {
        case SOCKET_EXIT:
            return 0;
        case SOCKET_DATA:
            forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
            break;
        case SOCKET_CLOSE:
            forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
            break;
        case SOCKET_OPEN:
            forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
            break;
        case SOCKET_ERR:
            forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
            break;
        case SOCKET_ACCEPT:
            forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
            break;
        case SOCKET_UDP:
            forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
            break;
        case SOCKET_WARNING:
            forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
            break;
        default:
            skynet_error(NULL, "Unknown socket message type %d.", type);
            return -1;
    }
    if (more) {
        return -1;
    }
    return 1;
}

  skynet_socket_poll 虽然比较长,但是可以看到它就是调用 socket_server_poll 来获取一个触发的 epoll 事件和事件的数据结果 result,然后根据事件的类型,走不同的 case,调用 forward_message 发送消息。

forward_message

  在函数 forward_message 中,做的事情就是将 result 参数里的数据打包成一条服务间的消息 skynet_message,然后发送给与本套接字绑定的服务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void forward_message(int type, bool padding, struct socket_message *result) {
    struct skynet_socket_message *sm;
    size_t sz = sizeof(*sm);
    if (padding) {
        if (result->data) {
            // 消息长度限制不能超过 128
            size_t msg_sz = strlen(result->data);
            if (msg_sz > 128) {
                msg_sz = 128;
            }
            sz += msg_sz;
        } else {
            result->data = "";
        }
    }
    sm = (struct skynet_socket_message *) skynet_malloc(sz);
    sm->type = type;
    sm->id = result->id;
    sm->ud = result->ud;
    if (padding) {
        sm->buffer = NULL;
        // 把 data 的数据追到 sm 的最后面
        memcpy(sm + 1, result->data, sz - sizeof(*sm));
    } else {
        sm->buffer = result->data;
    }

    // 构造一条新的 skynet 消息
    struct skynet_message message;
    message.source = 0;
    message.session = 0;
    message.data = sm;
    message.sz = sz | ((size_t) PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);

    if (skynet_context_push((uint32_t) result->opaque, &message)) { // 发出消息
        // todo: report somewhere to close socket
        // don't call skynet_socket_close here (It will block mainloop)
        skynet_free(sm->buffer);
        skynet_free(sm);
    }
}

  socket_message 中的 opaque 字段与 socket 中的意思一样,都是目标服务的 handle,构造好消息以后,调用了 skynet_context_push 将消息发送给了目标服务。

socket_server_poll

  函数 socket_server_poll 是 skynet 网络部分处理网络事件的最终循环。该函数会阻塞在 epoll_wait 上,当从 epoll_wait 中唤醒以后,开始一轮事件的处理。鉴于这是一个比较长的函数,直接贴全部源码来有点吓人,所以我会拆开来分析这个函数。首先来看一个全貌。

1
2
3
4
5
6
7
int socket_server_poll(struct socket_server *ss, struct socket_message *result, int *more) {
    for (;;) {
        // 1. 处理网络命令
        // 2. 处理一轮事件完毕以后的操作
        // 3. 处理单个事件
    }
}

  要事先说明的是 socket_server_poll 并不是一次处理全部事件的,它会将触发的事件保存下来,然后执行一个事件,将执行结果直接返回给我们上面提到的 skynet_socket_poll,由它根据不同的返回值来进行不同的消息转发。所以可以把一次 epoll_wait 返回得到的触发事件称为“一轮事件”,如果本轮事件还未结束时,不会再次进入 epoll_wait 的阻塞中,而是会继续处理之前保存的事件,直到之前保存的事件都处理完毕了,才会再次陷入阻塞拿取新的触发事件。
  按顺序我们先来看第一部分,处理网络命令部分的源码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if (ss->checkctrl) {
    // 读取指令
    if (has_cmd(ss)) {
        int type = ctrl_cmd(ss, result);
        if (type != -1) {
            clear_closed_event(ss, result, type);
            return type;
        } else
            continue;
    } else {
        ss->checkctrl = 0;
    }
}

  这一部分首先会检查 socket_servercheckctrl 字段是否为 1,如果为 1,那么会调用 has_cmd 尝试从命令管道的接收端读取数据。如果成功读到了命令,那么会调用 ctrl_cmd 来处理一条命令,返回值如果是 -1 代表命令执行成功且不需要通知关联的服务。   在每轮的处理中,首先会把检查控制命令的变量 checkctrl 置为 1,然后开始依次处理本次触发的网络事件。
  在网络事件的处理中,如果碰到的是命令事件,则直接 continue 回到循环的最上面去处理网络命令。否则则会去读取 socket 的状态 type,根据不同的 type 执行不同的操作,向 result 中填充相关的数据。
  每次从 epoll 中取到的就绪套接字个数放在 event_n 中,用一个变量 event_index 保存当前处理到了第几个套接字。当 event_index == event_n 的时候,则说明本轮的处理已经结束了,线程会再次调用 epoll_wait 获取下一轮要处理的就绪套接字。
  本函数会填充 result 参数,并且返回一个处理结果类型给 skynet_socket_poll,返回的结果一共包含了八种类型。

1
2
3
4
5
6
7
8
#define SOCKET_DATA 0       // 读取数据
#define SOCKET_CLOSE 1      // socket 关闭
#define SOCKET_OPEN 2       // socket 连接成功
#define SOCKET_ACCEPT 3     // accept 成功
#define SOCKET_ERR 4        // socket 错误
#define SOCKET_EXIT 5       // 退出 socket 线程
#define SOCKET_UDP 6        // 接收 udp 数据
#define SOCKET_WARNING 7    // socket 警告

网络指令处理

  每当 epoll_wait 返回时,新的一轮网络事件的处理就会开始,指令的检查标记 checkctrl 也会被设为 1,来开启指令检查。每轮只会处理一次指令,会一直连续处理指令直到全部处理完。
  通过 has_cmd 来检查管道中有没有还没处理的命令数据。这一步是使用系统调用 select 来实现的,使用 select 来检查 recvctrl_fd 是否可读。虽说 recvctrl_fd 被加到了 epoll 中,但是 epoll_wait 唤醒以后,如果是指令数据唤醒的,不会原地处理,而是等下一个循环处理,所以这里还要再检查一次是否可读,并没有以来 epoll 做标记之类的,可能是为了处理简单一些。
  如果 recvctrl_fd 中有指令等待读取,则调用 ctrl_cmd 读取并执行指令。其中用了两次 block_readpipe 来读取管道中的数据,第一次读取了数据头,包括了命令类型和数据长度,第二次用第一次读取到的数据长度读取了数据。
  block_readpipe 是用来从管道中读取数据的函数,可以看到一个有意思的地方,read 的返回值只处理了小于 0 的情况,并没有处理 n > 0 && n < size 的情况。这是因为对管道套接字执行 read 操作是一个原子操作,不会被别的情况比如信号之类的打断,所以只有两种可能,错误和全部读取完成。关于 pipe 套接字的读写后面可以考虑开一篇文章写一下。
  读取到了命令以后,根据命令类型,把数据交给不同的处理函数来处理即可。

网络请求

概述

  skynet 中涉及网络的操作,除了 direct write 以外,都是通过网络请求来实现的。worker 线程根据自己想要做的操作的类型,创建不同结构的请求数据,通过 send_request 发送到命令管道的发送端 sendctrl_fd 中去,等待主循环中接受处理请求。

请求包的结构

  因为 C 中没有面向对象的功能,所以通过 union 来实现了请求包的结构,每一种类型的请求对应了一类的请求结构体,所有的请求都会转化成一个 request_package 结构体,发送到管道中来。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
struct request_package {
    uint8_t header[8]; // 6 bytes dummy, 第 7 个字节表示类型,第 8 个字节表示长度
    union {
        char buffer[256];
        struct request_open open;
        struct request_send send;
        struct request_send_udp send_udp;
        struct request_close close;
        struct request_listen listen;
        struct request_bind bind;
        struct request_resumepause resumepause;
        struct request_setopt setopt;
        struct request_udp udp;
        struct request_setudp set_udp;
    } u;
    uint8_t dummy[256]; // 预留了 256 个字节
};

处理请求包

  socket 线程的主循环中,会不断读取管道中的数据,每条命令要执行两次 read 读取,第一次要读到 header 中的内容,然后根据 header[1] 的长度数据,读取剩余的数据。然后根据 hander[0] 中的类型数据,进行不同的处理。目前一共有 13 种网络操作类型。

S Start socket
B Bind socket
L Listen socket
K Close socket
O Connect to (Open)
X Exit
D Send package (high)
P Send package (low)
A Send UDP package
T Set opt
U Create UDP socket
C set udp address
Q query info

网络接口分析

概述

  因为所有的网络操作都是通过发送命令来进行的,所以 skynet 的网络接口都是非阻塞的,不同的接口会完成基本的操作,然后把需要的参数打包成一条上面提到过的 request_package 结构数据发送给命令的接收端。

connect

  通过调用 socketdriver.connect 可以发起一个对外连接。lconnect 中会从目标地址的字符串中分离出 host 和 port 这两个参数,再获取一个 socket id,然后打包成一个网络请求,给命令接收套接字发送了一个 ‘O’ 类型的命令。
  在 socekt 线程中的部分里,对应命令的处理方法是 open_socket,首先它会通过系统调用 getaddrinfo 拿到目标主机的全部地址,然后依此对每个地址尝试去执行系统调用 socket 创建一个套接字并且把它设为 keep_alive 和 non_blocking 的,然后执行 connect 系统调用。
  如果套接字创建成功了,则创建 socket 结构体。检查 connect 的调用返回,如果成功了,则直接把 socket 的状态设为 SOCKET_TYPE_CONNECTED,返回 SOCKET_OPEN,连接成功。
  如果连接失败了,且 errno 是 EINPROGRESS,也就是说无法马上连接的状态,则把 socket 的状态设为 SOCKET_TYPE_CONNECTING,并且将套接字加入到 epoll 中,打开写入事件。当 epoll 触发了套接字的 write 事件时,则说明之前的连接已经建立成功了。将 socket 的状态设为 SOCKET_TYPE_CONNECTED,返回 SOCKET_OPEN 表示连接成功。
  SOCKET_OPEN 返回以后,会创建一个 SKYNET_SOCKET_TYPE_CONNECT 类型的消息给发起连接的源服务,但是源服务并不需要处理连接成功的消息,所以 netpack 在进行解包的时候,直接忽略了 SKYNET_SOCKET_TYPE_CONNECT 类型的消息。

listen

  从 gateserver 的 open 命令来分析一下 skynet 套接字的监听步骤。socketdriver.listen 是 skynet 的监听接口,可以开启一个监听套接字,函数返回值是监听套接字的唯一标识符,也就是上面提到的 id 这个变量。监听的接口执行可以分为两个阶段,第一个是在 worker 线程中执行的部分,第二个是在 socket 线程中执行的部分。
  监听操作在第一阶段的执行中,主要逻辑在 socket_server_listen 函数中,其中依次调用了 socket/bind/listen 等系统调用,完成了网络套接字的创建,绑定和监听,但是并未将套接字加入到 epoll 的管理中去。然后通过 reserve_id 分配了一个 socket 结构的唯一 id,但是这里也不会创建 socket 结构体。创建一个 request_package 的结构体,将上面拿到的参数填入其 request_listen 结构体中,然后将其发送给命令的接收套接字 sendctrl_fd 即可。
  监听操作在第二阶段的执行中,主要逻辑在 listen_socket 函数中。这里逻辑就比较简单了,做的事情就是上段中点明了剩下的两个部分,把监听套接字加入到 epoll 管理中,并且创建了 socket 结构变量。socket 中的 type 会被修改为 SOCKET_TYPE_PLISTEN,只是 pre listen 还没有完全完成监听。还有一点需要注意的是,因为上一阶段已经拿到了 socket 的唯一 id,所以这里是直接修改了之前那个 id 在数组 socket_server.slot 中对应的 socket 结构体。
  到这里 socketdriver.listen 的工作就全部结束了,但是明显可以发现这个时候的 listen 还未完全完成。因为此时还未设置 accept 以后的回调,而且 socket 中的状态也还是未完成的监听状态。我们现在有一个 socket id 是监听套接字的句柄,需要做的是使用这个 id 调用 socketdriver.start 来执行后续的步骤。
  socketdriver.start 中主要做的事情是给网络命令接收套接字发送了一个 ‘R’ 请求,这个请求会把 id 对应的套接字加入到 epoll 管理中并且打开读取监听。不过由于 listen 的前期创建 socket 的时候已经把套接字加入到了 epoll 并且默认是打开读取的,所以这里并不会做什么操作。这里最主要修改的是上面提到的 socket 的状态,会把状态改为 SOCKET_TYPE_LISTEN,以让循环中可以正确处理监听,然后还把监听 socket 的源服务改为了调用 start 的服务,也就是说可以实现在某个服务中 listen 创建一个 socket id,然后把它传给另一个服务,由另一个服务调用 start 来接收后续的消息。

accept

  accept 由 socket 线程的 epoll 循环触发,如果触发网络事件的套接字是 SOCKET_TYPE_LISTEN 状态的话,则说明触发了 accept 事件。
  report_accept 是处理 accept 事件的函数,首先通过系统调用 accept 拿到网络套接字,然后拿到 socket 结构要用的唯一 id,client 的 fd 会被设置 keep_live 和 no_blocking,创建 socket 结构,把 socket 的状态设为 SOCKET_TYPE_PACCEPT 类型。
  上述处理结束以后,返回到 skynet_socket_poll 的类型为 SOCKET_ACCEPT,skynet_socket_poll 会调用 forward_message 给 socket 的源服务发消息,socket 的源服务现在是上一步中执行 socketdriver.start 的服务。发送的消息中把消息结构体中的 type 设为了 SKYNET_SOCKET_TYPE_ACCEPT 来标识消息的类型。
  消息会交给 gate 服务处理,它注册了 “socket” 类型的协议,负责解包的是 netpack.filter 函数。lfilter 在处理 SKYNET_SOCKET_TYPE_ACCEPT 时很简单,只是整理了一下参数而已,压入了操作类型对应的字符串 “open” 供 dispatch 方法调用。
  在 dispatch 中,“open” 操作对应了 MSG.open 方法,其中跟网络层有关的就是调用了 handle.connect,在 handle.connect 中,历经千难万苦,如果是用 examples 中提供的示例的话,就是经过了 watchdog 和 agent 的操作,最终调用了 socketdriver.start 来激活客户端的 socket 结构。与 listen 的步骤类似,在 start 中会把 client 的 socket 类型从 SOCKET_TYPE_PACCEPT 改为 SOCKET_TYPE_CONNECTED,socket 关联的服务 handle 会改为调用 socketdriver.start 的服务。

write

  发送数据有两种方法,常规的是通过 epoll 的写事件触发。为了减少一些开销,skynet 还做了一个叫做 direct write 的操作,也就是直接写入,不通过 socket 线程,而是在 worker 线程直接尝试把数据发出去。
  首先来看 direct write 的部分,发送数据的接口为 socket_server_send,这个函数首先会检查当前 socket 能不能直接发送,如果在当前 socket 的高或低优先级队列中有数据等待发送的,则不能直接发送。如果可以直接发送的话,则会直接在 worker 线程调用 write 往套接字中写入数据。
  直接发送会有三种结果。如果发送失败了,则忽略这个错误,当作写入了 0 长度的数据。如果完整发送了全部的数据,则可以直接返回,不需要再走后面的步骤了,本次发送已经完成了。如果只发送了部分数据,包括前面发送错误产生的结果,都会设置 dw_buffer/dw_size/dw_offset 这三个变量,等待后续 socket 线程再次进行数据发送,并且发送了一条网络消息 ‘W’ 来打开本套接字的写事件监听。
  触发 epoll 的写事件以后,如果有之前 direct write 阶段没发完的数据,会首先把剩余的数据加入到高优先级队列的首部。发送阶段,会优先先发高优先级队列的数据,然后再发低优先级队列中的数据,如果低优先级队列中的数据没有全部发完,则会借助一个叫做 raise_uncomplete 的操作,把剩余的数据提到高优先级队列中。
  在两种情况下会触发关闭套接字写事件的监听,首先是如果 write 如果返回了一个错误,且不是 EINTR(信号打断) 或者 EAGAIN(非阻塞套接字缓冲区写满) 错误的情况下会关闭写事件。还有就是当套接字的高优先级队列和低优先级队列都发送完毕的时候也会关闭写事件。

read

  当 epoll 中的套接字变为可读以后,如果是 TCP 连接,则使用 forward_message_tcp 读取套接字中的数据。
  每次读取的长度是 socket.p.size,初始为 64 字节,如果在本次读取的时候发现套接字中可读长度大于 size 的话,则将 size 扩大为之前的 2 倍,如果发现套接字中的数据比 size 的 1/2 还少的时候,则把 size 变为原来的 1/2 长度。另外,如果本次没有读取完套接字中的数据,则会减少 event_index,使下一次循环依然处理本事件。
  读取到数据以后,转化成消息,给 socket 的源服务发送一条 SKYNET_SOCKET_TYPE_DATA 类型的消息。这个消息会触发网络分包,通过 netpack.filter 进行分包,分包值得单开一篇文章细说一下,此处先一笔带过。

Licensed under CC BY-NC-SA 4.0
Built with Hugo
主题 StackJimmy 设计