functioncommand.listen(source,addr,port)localgate=skynet.newservice("gate")ifport==nilthenlocaladdress=assert(node_address[addr],addr.." is down")addr,port=string.match(address,"([^:]+):(.*)$")endskynet.call(gate,"lua","open",{address=addr,port=port})skynet.ret(skynet.pack(nil))end
functioncommand.socket(source,subcmd,fd,msg)ifsubcmd=="open"thenskynet.error(string.format("socket accept from %s",msg))-- new cluster agentcluster_agent[fd]=falselocalagent=skynet.newservice("clusteragent",skynet.self(),source,fd)localclosed=cluster_agent[fd]cluster_agent[fd]=agentifclosedthenskynet.send(agent,"lua","exit")cluster_agent[fd]=nilendelse-- ... 省略非相关代码endend
-- clusteragent 的启动函数skynet.start(function()skynet.register_protocol{name="client",id=skynet.PTYPE_CLIENT,unpack=cluster.unpackrequest,dispatch=dispatch_request,}-- fd can write, but don't read fd, the data package will forward from gate though client protocol.skynet.call(gate,"lua","forward",fd)skynet.dispatch("lua",function(_,source,cmd,...)ifcmd=="exit"thensocket.close(fd)skynet.exit()elseifcmd=="namechange"thenregister_name=new_register_name()elseskynet.error(string.format("Invalid command %s from %s",cmd,skynet.address(source)))endend)end)
functioncommand.push(addr,msg,sz)localrequest,new_session,padding=cluster.packpush(addr,session,msg,sz)ifpaddingthen-- is multi pushsession=new_sessionendchannel:request(request,nil,padding)end
接收数据
在 gate 服务收到消息数据以后,会调用 handler.message 来处理消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
functionhandler.message(fd,msg,sz)-- recv a package, forward itlocalc=connection[fd]localagent=c.agentifagentthen-- It's safe to redirect msg directly , gateserver framework will not free msg.skynet.redirect(agent,c.client,"client",fd,msg,sz)-- 重定向到客户端elseskynet.send(watchdog,"lua","socket","data",fd,skynet.tostring(msg,sz))-- skynet.tostring will copy msg to a string, so we must free msg here.skynet.trash(msg,sz)endend