skynet 进程中一共有五种线程,主线程、monitor 线程、timer 线程、socket 线程和 worker 线程。每种线程各司其职,本篇会讲述一下各种线程的作用。
主线程
skynet 中的主线程只负责进行初始化的工作,在前篇讲启动流程的时候,那些操作都是发生在主线程中的。
主线程不参与任何业务的处理,在初始化完所有的全局变量以后,它按配置创建出了剩下的四种线程,然后调用 pthread_join
阻塞自己,等待所有线程执行结束。
monitor 结构
在介绍其余线程之前,首先要看一个结构体 monitor,它跟后面介绍的 monitor 线程同名,但是它并不是 monitor 线程专用的数据,而是给所有线程调度 worker 线程使用的。
|
|
monitor 结构体变量每个进程只有一个,可以看到它是一个有锁的结构,这里可以发现一个与其它地方明显的区别,锁直接使用了 pthread 库的 mutex 来做,而没有用 skynet 中更常用的 spinlock 来实现,主要是因为要配合 cond 来做线程挂起。
monitor 线程
作用
每个进程中只有以一个 monitor 线程,它的功能比较简单,monitor 顾名思义就是监控,它监控的就是所有 worker 线程的工作状态,如果 worker 线程在处理一条消息的时候用时太久了,monitor 线程会打印出一条错误日志,告诉开发者一条从 A 服务到 B 服务的消息的处理逻辑中可能有死循环存在。
实现
来看一下 monitor 线程的主函数。
|
|
线程会进入一个无限循环中,它每 5s 对每个 worker 线程的 skynet_monitor
执行了 skynet_monitor_check
操作。
每个 worker 线程都有一个与之绑定的 skynet_monitor 的变量,它正是被保存在了上文提到的 struct monitor 中,它负责记录本 worker 线程的检查状态。
|
|
每次当 worker 线程开始处理一条消息的时候,它会修改自己对应的 skynet_monitor 中变量的值。其中 version 会被一直累加,并且将 source 和 destination 设为当前处理的消息的源地址和目标地址。
|
|
skynet_monitor_check
中的操作也很简单,就是检查 skynet_monitor
中的 vesion
和 check_version
是否一致。如果 version 和 check_version 不相等,则把 check_version 设为 version,如果相等,则说明从上次检查到这次检查也就是 5s 之内,worker 线程都在处理同一条消息。这时候就认为这个 worker 线程可能已经陷入了死循环中。把目标服务的 endless 属性设为 true,然后输出一条错误日志警告开发者。
通过阅读实现可以发现,monitor 线程只能起到非常微弱的辅助作用,那就是如果一条消息的执行时间超过 5s,就发出一次警告。
timer 线程
作用
timer 线程主要负责了两件事情,更新系统的当前时间和唤醒一个睡眠的 worker 线程。前者是为了给时间的获取接口提供时间返回值,以及执行定时任务,后者是让之前因为没有取到消息处理而睡眠的 worker 线程再次尝试去处理消息。
自己计时的意义
计时的实现很简单,记录了一个进程的开始时间 starttime 和一个进程从开始到现在经过的时间 current 来完成当前时间的计算,每经过 2.5ms 将 current 的值更新一次。
之所以要框架自己实现计时的原因主要有两个。首先,可以为业务层取高精度时间提供一个高效的近似接口,不用每次都去调用 clock_gettime
取,提高了性能。第二个原因是自己实现的计时不会被系统时间影响,在进程执行的过程中如果系统时间被改掉了,自己实现的计时器还是会按以前的步骤执行,这样可以避免一些问题的发生,比如定时器的触发问题。
时间接口的使用
在 skynet 中有四个时间接口,skynet.now
,skynet.time
,skynet.hpc
,还有 Lua 官方提供的 os.time
,要根据场景使用不同的接口。
skynet.now
直接返回了current
的值,也就是进程启动到现在的时间,这是个近似接口,current
的更新由 timer 线程维护,精度为 2.5ms。skynet.time
返回了current
+starttime
的时间,也就是当前的绝对时间,它同样是个近似值,精度也是 2.5ms。skynet.hpc
则直接调用了clock_gettime
拿取了进程启动的相对纳秒时间,这个是准确的时间,但是消耗是最大的。os.time
接口是直接调用了time
的,它返回的是秒级的绝对时间,时间单位的精度是最差的,但是接口的调用速度是最快的。
定时器的实现
skynet 中定时器的计时是使用时间轮算法来实现的,时间轮是一个被广泛用于实现高效率定时器的算法,linux kernel 的定时器也是使用时间轮算法实现的。
skynet.timeout 是 skynet 提供的定时调用接口,它首先拿到一个新的 session 用于接收定时器的唤醒,然后创建一个包裹了等待执行的函数的协程,把协程跟 session 关联起来,然后用 session 和自己的 handle 创建一个 struct timer_event 加入到时间轮中。
|
|
等待时间轮转动到了指定的时间点以后,从里面取出这个 timer_event 结构体变量,使用其中的 session 来创建一条 PTYPE_RESPONSE 类型的消息,将消息 push 到目标 handle 的消息队列里。等 worker 线程处理这条消息时,根据 session 拿到对应的 co 执行它,一个定时器的调用就完成了。
socket 线程
作用
socket 线程主要做了三件事,接收并且处理 socket 命令,处理 epoll 事件,然后如果当前全部的 worker 线程都在睡眠中,则唤醒其中的一个。
需要注意的是,本线程并不会去执行任何 epoll 事件,所有事件都是转换成一条 PTYPE_SOCKET 类型的消息,发送给与相关 socket 绑定的服务。
socket 线程的具体实现比较复杂,后面会专门开一篇网络专文,此处只大概讲一下作用,具体实现暂时略过。
worker 线程
作用
worker 线程顾名思义,就是处理逻辑的主力线程了。它只做一件事情,处理消息。它会尝试去全局队列中拿到一个服务队列,然后再根据自己的负载参数,处理其中一定比例的消息。如果拿不到消息队列,会把自己投入睡眠中,等待 timer 线程或是 socket 线程唤醒自己。
数量
worker 线程是这几类线程中唯一可以通过 config 中的配置参数修改线程数量的。一般把 worker 线程的数量设置为本机的 cpu 核心数即可。
工作参数
每个 worker 线程有一个属于自己的参数结构体 worker_parm,用来保存一些本线程的参数。
|
|
其中 weight 表示的是工作权重,目的是为了尽量让不同的 worker 线程的步骤不一样,从而减轻在全局消息队列那里的锁竞争问题。
|
|
worker 线程在拿到服务的消息队列以后,会把队列长度 n »= weight 来得到本次要处理的消息数量。所以前四个线程每次只处理一条消息,后面的四个每次处理队列中的全部消息,再后面分别是每次处理总长度的 1/2,1/4,1/8 条消息,32 以后的 worker 线程的 weight 一律为 0,也就是每次处理消息队列中的全部消息。
实现
照例先来看一下 worker 线程的主函数。
|
|
在主函数中,会不断调用 skynet_context_message_dispatch
,当它的返回值 q 不为 NULL 时,说明还有消息未处理完毕,它会被再次调用,并且 q 会被当作参数重新传给它。如果 q 为 NULL,则说明当前全局队列的消息已经处理完毕了,worker 线程会阻塞在 cond 上,等待其它线程唤醒它。
|
|
通过调用 skynet_globalmq_pop 从全局队列中取出一个服务队列,全局队列是个链表,此处加锁以后,从链表头部取出即可。
拿到服务消息队列以后,需要从里面拿取消息,并且处理。这个过程在一个循环中,循环的次数跟本线程的工作权重 weight 有关,通过计算当前队列中待处理的消息总长度 n »= weight 得到本次要处理的消息条数。
dispatch_message 用来处理消息,通过移位拿到消息的类型和长度,累加处理消息计数,然后调用 context 的 callback 函数进行处理。
本轮消息处理完毕以后,会尝试获取一个新的服务队列,如果能拿到,不管当前处理的队列中还有没有剩余消息,都会把当前队列 push 到全局队列的末尾,如果拿不到新的队列,则继续处理本队列,即使它其中已经没有消息了,会在下一次检查消息长度的时候直接返回。
如果一开始拿到的要处理的服务队列中本来就没有消息的话,则会把其 in_global 参数设为 0,且不会将其放回全局队列中,而是会尝试从全局队列中再拿取一个队列返回,来进行下一次的处理。这个没有消息也没有在全局队列中的服务队列会一直保持这个状态,直到下一次有其它服务向其发送消息,这时候会重新把其 in_global 参数设为 MQ_IN_GLOBAL,并把其 push 回全局队列中。
当 worker 线程已经从全局队列中取不到服务队列时,它会锁上 monitor 结构的锁,然后累加休眠的线程数 sleep,并且使用 pthread_cond_wait 将线程阻塞在 monitor.cond 上,等待 socket 线程或是 timer 线程唤醒。