skynet 中支持 harbor 和 cluster 两种方式实现集群,本篇主要讨论 cluster,原因是我个人比较喜欢 cluster 这种方式,XD
配置
集群功能需要各个远程主机的配置,这个配置可以通过在 config 文件中指定 cluster 变量为一个配置文件的路径来实现,也可以通过 cluster.reload 来加载。
不管是通过文件还是 cluster.reload 加载配置,配置的格式都是一样的,是一个 Lua 的 table 格式。
1
|
{NodeName = "[xxx.xxx.xxx.xxx]:xxxx"}
|
需要指出的是,配置文件中需要有所有要对外连接节点的配置。
服务
实现 cluster 时使用了很多服务,整个逻辑个人感觉有一点绕。这里先来费尽九牛二虎之力画一个架构图,方便理解。画图苦手已经快顶不住了 /(ㄒoㄒ)/~~
clusterd 是一个通过 skynet.uniqueservice 创建出来的进程唯一服务。它在 cluster 第一次被 require 的时候就会启动。clusterd 提供集群的管理服务,常用的集群接口 cluster.xxx 大部分都是在向 clusterd 发送消息。
每次使用 clusterd 执行 listen 命令时创建会为这个套接字创建一个 gate 服务,它主要负责本端口的套接字监听和数据接收等操作。当有新的连接创建时,gate 会向 clusterd 发送 socket open,clusterd 会为每个连接创建一个 clusteragent 服务来接收后续消息。
clusteragent 在被创建完成以后会向 gate 发送一条 forward 消息,修改 gate 的 connection 中对应的内容来接收后续这个 fd 上的所有数据。它会将后续数据按目标服务的名字转发给已注册过的服务。
clustersender 由 clusterd 创建,当节点有数据需要发送时,clusterd 会为每个对外部节点的连接创建一个 clustersender 服务用来管理这条连接。当有数据要发出给其它节点时,会找到它对应的 sender,使用 sender 将数据发出。
listen
集群中的每个进程都要有一个对外的端口,cluster.open 可以打开端口监听,可以传端口,也可以传配置中有的节点名字。
1
2
3
4
5
6
7
|
function cluster.open(port)
if type(port) == "string" then
skynet.call(clusterd, "lua", "listen", port)
else
skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
end
end
|
这里有一个地方需要注意一下,如果是传端口开启监听的话,绑定的主机地址是 “0.0.0.0”,这个可能会有安全隐患,所以还是使用配置比较好。
1
2
3
4
5
6
7
8
9
|
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
if port == nil then
local address = assert(node_address[addr], addr .. " is down")
addr, port = string.match(address, "([^:]+):(.*)$")
end
skynet.call(gate, "lua", "open", { address = addr, port = port })
skynet.ret(skynet.pack(nil))
end
|
实际上处理监听的逻辑在 clusterd 中,当调用 listen 的时候,会启动一个新的网关服务 gate 来处理网络相关操作。启动好 gate 以后,通过 skynet.call 让 gate 服务执行 open 操作,开启端口的监听。
accept
clusterd 作为了 gate 的 watchdog 提供服务。当有外部连接时,watchdog 的 socket 命令中的 open 子命令会被执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
function command.socket(source, subcmd, fd, msg)
if subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
-- new cluster agent
cluster_agent[fd] = false
local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
local closed = cluster_agent[fd]
cluster_agent[fd] = agent
if closed then
skynet.send(agent, "lua", "exit")
cluster_agent[fd] = nil
end
else
-- ... 省略非相关代码
end
end
|
clusterd 会为每个新连接创建一个 clusteragent 服务来处理本连接的数据,并且将 fd 与 clusteragent 的映射关系保存在 cluster_agent
中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
-- clusteragent 的启动函数
skynet.start(function()
skynet.register_protocol {
name = "client",
id = skynet.PTYPE_CLIENT,
unpack = cluster.unpackrequest,
dispatch = dispatch_request,
}
-- fd can write, but don't read fd, the data package will forward from gate though client protocol.
skynet.call(gate, "lua", "forward", fd)
skynet.dispatch("lua", function(_,source, cmd, ...)
if cmd == "exit" then
socket.close(fd)
skynet.exit()
elseif cmd == "namechange" then
register_name = new_register_name()
else
skynet.error(string.format("Invalid command %s from %s", cmd, skynet.address(source)))
end
end)
end)
|
clusteragent 会注册 client 的消息类型,然后对 gate 服务调用 forward 操作,把本服务设为连接的 agent 来接管后续的数据。
connect
集群节点之间对外连接是长连接,连接上以后不会主动断开。在节点第一次对外发送消息的时候会向对方发起连接。
在 clusterd 的 node_channel
中保存了全部节点的连接,它的元方法 __index 会调用 open_channel 开启一个对外连接。对外连接是通过另一个服务 clustersender 实现的。
每条对外连接都会有一个对应的 clustersender 服务,它的启动方法如下。
1
2
3
4
5
6
7
8
9
10
11
12
|
skynet.start(function()
channel = sc.channel { -- 这一步只是创建,不会发起连接
host = init_host,
port = tonumber(init_port),
response = read_response,
nodelay = true,
}
skynet.dispatch("lua", function(session , source, cmd, ...)
local f = assert(command[cmd])
f(...)
end)
end)
|
clustersender 最主要的功能就是维护了一条 socket_channel
,当节点中有消息要发出时,clusterd 会根据目标节点的名字找到对应的 clustersender,将消息转发给它,由它的 channel 将数据通过 socket 连接发出去。
对外服务名字
集群节点上的服务可以注册一个对外的名字,一定要在注册名字以后才能接收集群上其它节点的消息。需要使用 cluster.register 来注册对外的名字,这个接口会给 clusterd 发送一条 register 消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
function command.register(source, name, addr)
assert(register_name[name] == nil)
addr = addr or source
local old_name = register_name[addr]
if old_name then
register_name[old_name] = nil
clearnamecache()
end
register_name[addr] = name
register_name[name] = addr
skynet.ret(nil)
skynet.error(string.format("Register [%s] :%08x", name, addr))
end
|
在 clusterd 中,有一个名为 register_name
的 table 保存了全部的对外服务地址和名字,这里要注意的是,如果两次注册同一个名字的话,第二次的服务会替换掉第一次的服务。而且服务的 name 和 addr 不能重复,但是这个应该还好,很难重复。
发送数据
与同节点的不同服务间发送消息的接口类似,cluster.send 可以向指定节点的指定服务发送数据。cluster.call 向指定节点的指定服务执行一个调用。
消息通过在连接时启动的 clustersender 服务进行处理。发送之前需要进行数据打包,都是通过 lua-cluster.c 中的 packrequest 进行打包的。打包好的数据通过之前服务启动时建立的 socket 连接发送出去,代码实现如下。
1
2
3
4
5
6
7
8
|
function command.push(addr, msg, sz)
local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
if padding then -- is multi push
session = new_session
end
channel:request(request, nil, padding)
end
|
接收数据
在 gate 服务收到消息数据以后,会调用 handler.message 来处理消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent
if agent then
-- It's safe to redirect msg directly , gateserver framework will not free msg.
skynet.redirect(agent, c.client, "client", fd, msg, sz) -- 重定向到客户端
else
skynet.send(watchdog, "lua", "socket", "data", fd, skynet.tostring(msg, sz))
-- skynet.tostring will copy msg to a string, so we must free msg here.
skynet.trash(msg, sz)
end
end
|
在 clusteragent 创建好以后,它向 gate 服务发送的 forward 消息会将自己的地址保存在 connection
对应的 agent 中,所以这里会通过上面的 if 分支,调用 skynet.redirect 将消息转发给 clusteragent。
clusteragent 使用 lua-cluster.c 中的 lunpackrequest 进行数据解包。解包以后的数据交给消息分发函数 dispatch_request
进行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
-- 经过大量精简后的 dispatch_request
local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
local ok, response
if cluster.isname(addr) then
addr = register_name[addr]
end
if addr then
if is_push then
skynet.rawsend(addr, "lua", msg, sz) -- 将消息发送转指定地址服务
return -- no response
else
if tracetag then
ok, msg, sz = pcall(skynet.tracecall, tracetag, addr, "lua", msg, sz)
tracetag = nil
else
ok, msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
end
end
else
ok = false
msg = "Invalid name"
end
if ok then
response = cluster.packresponse(session, true, msg, sz)
if type(response) == "table" then
for _, v in ipairs(response) do
socket.lwrite(fd, v)
end
else
socket.write(fd, response)
end
else
response = cluster.packresponse(session, false, msg)
socket.write(fd, response)
end
end
|
在 dispatch_request
中,第一次向某个服务发消息之前会去 clusterd 中根据名字查询服务地址,查到以后会保存下来,以后不会再次查询。
因为数据已经打包过了,所以这一步的发送不再需要打包数据了,发送消息使用的是 rawsend 和 rawcall 即可。使用对应方法直接把消息转给通过服务名字查询到的服务地址。
当名字有变更的时候,clusterd 向所有 clusteragent 发送 namechange 消息,agent 会在收到消息以后会将之前缓存的所有名字清空。