skynet源码分析(七)各种线程的作用

  skynet 进程中一共有五种线程,主线程、monitor 线程、timer 线程、socket 线程和 worker 线程。每种线程各司其职,本篇会讲述一下各种线程的作用。

主线程

  skynet 中的主线程只负责进行初始化的工作,在前篇讲启动流程的时候,那些操作都是发生在主线程中的。
  主线程不参与任何业务的处理,在初始化完所有的全局变量以后,它按配置创建出了剩下的四种线程,然后调用 pthread_join 阻塞自己,等待所有线程执行结束。

monitor 结构

  在介绍其余线程之前,首先要看一个结构体 monitor,它跟后面介绍的 monitor 线程同名,但是它并不是 monitor 线程专用的数据,而是给所有线程调度 worker 线程使用的。

1
2
3
4
5
6
7
8
struct monitor {
    int count; // worker 线程的总数
    struct skynet_monitor **m; // 全部 worker 线程的 monitor 指针
    pthread_cond_t cond; // 给 worker 线程挂起用的全局条件
    pthread_mutex_t mutex; // 有锁结构
    int sleep; // 睡眠的 worker 线程数量
    int quit; // 退出标记
};

  monitor 结构体变量每个进程只有一个,可以看到它是一个有锁的结构,这里可以发现一个与其它地方明显的区别,锁直接使用了 pthread 库的 mutex 来做,而没有用 skynet 中更常用的 spinlock 来实现,主要是因为要配合 cond 来做线程挂起。

monitor 线程

作用

  每个进程中只有以一个 monitor 线程,它的功能比较简单,monitor 顾名思义就是监控,它监控的就是所有 worker 线程的工作状态,如果 worker 线程在处理一条消息的时候用时太久了,monitor 线程会打印出一条错误日志,告诉开发者一条从 A 服务到 B 服务的消息的处理逻辑中可能有死循环存在。

实现

  来看一下 monitor 线程的主函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void *thread_monitor(void *p) {
    struct monitor *m = p;
    int i;
    int n = m->count; // 拿到 worker 线程的数量
    // 设置线程属性
    skynet_initthread(THREAD_MONITOR);
    for (;;) {
        CHECK_ABORT
        // 遍历全部的线程 monitor 进行检查
        for (i = 0; i < n; i++) {
            skynet_monitor_check(m->m[i]);
        }
        // 睡眠 5s
        // 使用循环分开调用是为了更快的触发 abort
        for (i = 0; i < 5; i++) {
            CHECK_ABORT
            sleep(1);
        }
    }

    return NULL;
}

  线程会进入一个无限循环中,它每 5s 对每个 worker 线程的 skynet_monitor 执行了 skynet_monitor_check 操作。
  每个 worker 线程都有一个与之绑定的 skynet_monitor 的变量,它正是被保存在了上文提到的 struct monitor 中,它负责记录本 worker 线程的检查状态。

1
2
3
4
5
6
struct skynet_monitor {
    ATOM_INT version; // 原子 version
    int check_version; // 上次检查时的 version
    uint32_t source; // 当前处理的消息的源 handle
    uint32_t destination; // 当前处理的消息的目标 handle
};

  每次当 worker 线程开始处理一条消息的时候,它会修改自己对应的 skynet_monitor 中变量的值。其中 version 会被一直累加,并且将 source 和 destination 设为当前处理的消息的源地址和目标地址。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void skynet_monitor_check(struct skynet_monitor *sm) {
    // 如果相等,说明还在处理上次检查的那条消息,给出警告
    if (sm->version == sm->check_version) {
        if (sm->destination) {
            // 设置 endless 标签
            skynet_context_endless(sm->destination);
            skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)",
                         sm->source, sm->destination, sm->version);
        }
    } else {
        // 如果不相等,这保存本次检查的 version
        sm->check_version = sm->version;
    }
}

  skynet_monitor_check 中的操作也很简单,就是检查 skynet_monitor 中的 vesioncheck_version 是否一致。如果 version 和 check_version 不相等,则把 check_version 设为 version,如果相等,则说明从上次检查到这次检查也就是 5s 之内,worker 线程都在处理同一条消息。这时候就认为这个 worker 线程可能已经陷入了死循环中。把目标服务的 endless 属性设为 true,然后输出一条错误日志警告开发者。
  通过阅读实现可以发现,monitor 线程只能起到非常微弱的辅助作用,那就是如果一条消息的执行时间超过 5s,就发出一次警告。

timer 线程

作用

  timer 线程主要负责了两件事情,更新系统的当前时间和唤醒一个睡眠的 worker 线程。前者是为了给时间的获取接口提供时间返回值,以及执行定时任务,后者是让之前因为没有取到消息处理而睡眠的 worker 线程再次尝试去处理消息。

自己计时的意义

  计时的实现很简单,记录了一个进程的开始时间 starttime 和一个进程从开始到现在经过的时间 current 来完成当前时间的计算,每经过 2.5ms 将 current 的值更新一次。
  之所以要框架自己实现计时的原因主要有两个。首先,可以为业务层取高精度时间提供一个高效的近似接口,不用每次都去调用 clock_gettime 取,提高了性能。第二个原因是自己实现的计时不会被系统时间影响,在进程执行的过程中如果系统时间被改掉了,自己实现的计时器还是会按以前的步骤执行,这样可以避免一些问题的发生,比如定时器的触发问题。

时间接口的使用

  在 skynet 中有四个时间接口,skynet.nowskynet.timeskynet.hpc 还有 Lua 官方提供的 os.time,要根据场景使用不同的接口。

  • skynet.now 直接返回了 current 的值,也就是进程启动到现在的时间,这是个近似接口,current 的更新由 timer 线程维护,精度为 2.5ms。
  • skynet.time 返回了 current + starttime 的时间,也就是当前的绝对时间,它的精度同样为 2.5ms。
  • skynet.hpc 则直接调用了 clock_gettime 拿取了进程启动的相对纳秒时间,这个是准确的时间,但是消耗是最大的。
  • os.time 接口是直接调用了 time 的,它返回的是秒级的绝对时间,是最快的。

定时器的实现

  skynet 中定时器的计时是使用时间轮算法来实现的,时间轮是一个被广泛用于实现高效率定时器的算法,linux kernel 的定时器也是使用时间轮算法实现的。
  skynet.timeout 是 skynet 提供的定时调用接口,它首先拿到一个新的 session 用于接收定时器的唤醒,然后创建一个包裹了等待执行的函数的协程,把协程跟 session 关联起来,然后用 session 和自己的 handle 创建一个 struct timer_event 加入到时间轮中。

1
2
3
4
struct timer_event {
    uint32_t handle;
    int session;
};

  等待时间轮转动到了指定的时间点以后,从里面取出这个 timer_event 结构体变量,使用其中的 session 来创建一条 PTYPE_RESPONSE 类型的消息,将消息 push 到目标 handle 的消息队列里。等 worker 线程处理这条消息时,根据 session 拿到对应的 co 执行它,一个定时器的调用就完成了。

socket 线程

作用

  socket 线程主要做了三件事,接收并且处理 socket 命令,处理 epoll 事件,然后如果当前全部的 worker 线程都在睡眠中,则唤醒其中的一个。
  需要注意的是,本线程并不会去执行任何 epoll 事件,所有事件都是转换成一条 PTYPE_SOCKET 类型的消息,发送给与相关 socket 绑定的服务。
  socket 线程的具体实现比较复杂,后面会专门开一篇网络专文,此处只大概讲一下作用,具体实现暂时略过。

worker 线程

作用

  worker 线程顾名思义,就是处理逻辑的主力线程了。它只做一件事情,处理消息。它会尝试去全局队列中拿到一个服务队列,然后再根据自己的负载参数,处理其中一定比例的消息。如果拿不到消息队列,会把自己投入睡眠中,等待 timer 线程或是 socket 线程唤醒自己。

数量

  worker 线程是这几类线程中唯一可以通过 config 中的配置参数修改线程数量的。一般把 worker 线程的数量设置为本机的 cpu 核心数即可。

工作参数

  每个 worker 线程有一个属于自己的参数结构体 worker_parm,用来保存一些本线程的参数。

1
2
3
4
5
struct worker_parm {
    struct monitor *m; // 全局 monitor 的引用
    int id; // 线程ID
    int weight; // 工作权重
};

  其中 weight 表示的是工作权重,目的是为了尽量让不同的 worker 线程的步骤不一样,从而减轻在全局消息队列那里的锁竞争问题。

1
2
3
4
5
6
static int weight[] = {
    -1, -1, -1, -1, 0, 0, 0, 0,
    1, 1, 1, 1, 1, 1, 1, 1,
    2, 2, 2, 2, 2, 2, 2, 2,
    3, 3, 3, 3, 3, 3, 3, 3,
};

  worker 线程在拿到服务的消息队列以后,会把队列长度 n »= weight 来得到本次要处理的消息数量。所以前四个线程每次只处理一条消息,后面的四个每次处理队列中的全部消息,再后面分别是每次处理总长度的 1/2,1/4,1/8 条消息,32 以后的 worker 线程的 weight 一律为 0,也就是每次处理消息队列中的全部消息。

实现

  照例先来看一下 worker 线程的主函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static void *thread_worker(void *p) {
    struct worker_parm *wp = p;
    int id = wp->id;
    int weight = wp->weight;
    struct monitor *m = wp->m;
    struct skynet_monitor *sm = m->m[id];
    skynet_initthread(THREAD_WORKER);
    struct message_queue *q = NULL;
    while (!m->quit) {
        q = skynet_context_message_dispatch(sm, q, weight);
        // 如果 q 是 NULL 的话,说明没有 pop 到要处理的消息队列,要把它投入到睡眠中去
        if (q == NULL) {
            // 获取全局 monitor 的锁
            if (pthread_mutex_lock(&m->mutex) == 0) {
                // 累加当前睡眠的线程数量
                ++m->sleep;
                // "spurious wakeup" is harmless,
                // because skynet_context_message_dispatch() can be call at any time.
                if (!m->quit)
                    // 等待在条件上,释放 mutex, 等待 socket 线程或是 timer 线程唤醒
                    pthread_cond_wait(&m->cond, &m->mutex);
                // 被唤醒后减少睡眠线程数
                --m->sleep;
                // 释放锁
                if (pthread_mutex_unlock(&m->mutex)) {
                    fprintf(stderr, "unlock mutex error");
                    exit(1);
                }
            }
        }
    }
    return NULL;
}

  在主函数中,会不断调用 skynet_context_message_dispatch,当它的返回值 q 不为 NULL 时,说明还有消息未处理完毕,它会被再次调用,并且 q 会被当作参数重新传给它。如果 q 为 NULL,则说明当前全局队列的消息已经处理完毕了,worker 线程会阻塞在 cond 上,等待其它线程唤醒它。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
struct message_queue *skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    if (q == NULL) {
        q = skynet_globalmq_pop();
        if (q == NULL)
            return NULL;
    }

    // 拿到服务消息队列的 handle
    uint32_t handle = skynet_mq_handle(q);
    // 通过消息队列的 handle 可以拿到对应的服务
    struct skynet_context *ctx = skynet_handle_grab(handle);
    // 如果拿不到对应的服务,说明这消息队列被废弃了
    if (ctx == NULL) {
        struct drop_t d = {handle};
        skynet_mq_release(q, drop_message, &d);
        return skynet_globalmq_pop();
    }

    int i, n = 1;
    struct skynet_message msg;

    for (i = 0; i < n; i++) {
        if (skynet_mq_pop(q, &msg)) { // 拿不到消息,直接返回
            skynet_context_release(ctx);
            return skynet_globalmq_pop();
        } else if (i == 0 && weight >= 0) {
            // 消息队列的总长度
            n = skynet_mq_length(q);
            // 按线程工作权重拿本次要处理的消息数量
            n >>= weight;
        }
        int overload = skynet_mq_overload(q);
        if (overload) {
            skynet_error(ctx, "May overload, message queue length = %d", overload);
        }

        // 更新 monitor 的记录和计数
        skynet_monitor_trigger(sm, msg.source, handle);

        if (ctx->cb == NULL) {
            skynet_free(msg.data);
        } else {
            // 处理消息
            dispatch_message(ctx, &msg);
        }

        // 更新 monitor 的记录和计数
        skynet_monitor_trigger(sm, 0, 0);
    }

    assert(q == ctx->queue);
    struct message_queue *nq = skynet_globalmq_pop();
    if (nq) {
        // 如果全局队列是非空的,会将 q push 回全局队列的最后,这是必要的,因为 q 的消息并不一定被处理完了
        // 当全局队列已经空了的时候,会继续返回 q,下一次还会处理 q,直到 q 中没有了消息
        // If global mq is not empty , push q back, and return next queue (nq)
        // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
        skynet_globalmq_push(q);
        q = nq;
    }
    skynet_context_release(ctx);

    return q;
}

  通过调用 skynet_globalmq_pop 从全局队列中取出一个服务队列,全局队列是个链表,此处加锁以后,从链表头部取出即可。
  拿到服务消息队列以后,需要从里面拿取消息,并且处理。这个过程在一个循环中,循环的次数跟本线程的工作权重 weight 有关,通过计算当前队列中待处理的消息总长度 n »= weight 得到本次要处理的消息条数。
  dispatch_message 用来处理消息,通过移位拿到消息的类型和长度,累加处理消息计数,然后调用 context 的 callback 函数进行处理。
  本轮消息处理完毕以后,会尝试获取一个新的服务队列,如果能拿到,不管当前处理的队列中还有没有剩余消息,都会把当前队列 push 到全局队列的末尾,如果拿不到新的队列,则继续处理本队列,即使它其中已经没有消息了,会在下一次检查消息长度的时候直接返回。
  如果一开始拿到的要处理的服务队列中本来就没有消息的话,则会把其 in_global 参数设为 0,且不会将其放回全局队列中,而是会尝试从全局队列中再拿取一个队列返回,来进行下一次的处理。这个没有消息也没有在全局队列中的服务队列会一直保持这个状态,直到下一次有其它服务向其发送消息,这时候会重新把其 in_global 参数设为 MQ_IN_GLOBAL,并把其 push 回全局队列中。
  当 worker 线程已经从全局队列中取不到服务队列时,它会锁上 monitor 结构的锁,然后累加休眠的线程数 sleep,并且使用 pthread_cond_wait 将线程阻塞在 monitor.cond 上,等待 socket 线程或是 timer 线程唤醒。

Licensed under CC BY-NC-SA 4.0
Built with Hugo
主题 StackJimmy 设计