Skynet设计原理
Skynet框架是属于一种多核并发编程模型,常见的多核并发模型有:
多线程模型
在一个进程中开启多线程,为了充分利用多核,一般设置工作线程的个数为 CPU的核心数;
Memcached
就是采用这种方式;多线程在一个进程当中,所以数据共享来自进程当中的内存;这里会涉及到很多临界资源的访问,所以需要考虑加锁, 小粒度考虑使用自旋锁;
多进程模型
在一台机器当中,开启多个进程充分利用多核,一般设置工作进程的个数为 CPU的核心数;
Nginx 就是采用这种方式;
Nginx 当中的worker进程,通过共享内存来进行共享数据;也需要考虑使用锁;比如
ngx_shmtx_t
依次尝试使用自旋锁、信号量、文件锁。CSP模型
CSP(Communicating Sequential Processes), 以 go 语言为代表,并发实体是协程(用户态线程、轻量级线程), 使用管道channel来通信;内部也是采用多少个核心开启多少个内核线程来充分利用多核;
Actor模型
Erlang 从语言层面支持 Actor 并发模型,并发实体是 Actor(在 Skynet中称之为服务),他们通过发送消息来相互通信;Skynet采用 C + Lua来实现 Actor 并发模型;底层也是通过采用多少个核心开启多少个内核线程来充分利用多核;
总结 :
不要通过共享内存来通信,而应该通过通信来共享内存;
CSP 和 Actor 都符合这一哲学;
通过通信来共享数据,其实是一种解耦合的过程;并发实体之间可以分别开发并进行单独优化,而它们唯一的耦合在于消息;这能让我们快速地进行开发;同时也符合我们开发的思路,将一个大的问题拆分成若干个小问题;
Skynet框架
主要就是Actor模块的封装。它是一个轻量级游戏服务器框架,而不仅仅用于游戏,还有金融、监控等;
轻量级体现在:
- 仅实现 actor 模型,以及相关的脚手架(工具集),如消息队列通信、调度、广播、资源共享等(
lualib
目录下的功能HTTP、Websocket、DB、Sharedata
等也属于此列); - 实现了服务器框架的基础组件;
实现了 reactor 并发网络库;并提供了大量连接的接入方案;
基于自身网络库,实现了常用的数据库驱动(异步连接方案),并融合了Lua 数据结构;
环境准备
centos :
yum install -y git gcc readline-devel autoconf
ubuntu :
apt-get install git build-essential readline-dev autoconf
# 或者
apt-get install git build-essential libreadline-dev autoconf
Mac:
brew install git gcc readline autoconf
Actor模型
有消息的 Actor 为活跃的 Actor,没有消息为非活跃的 Actor;
Actor定义
- 用于并行计算;可以理解为用户态轻量级进程。
- Actor 是最基本的计算单元和并发单元, 有自己的状态访谈变量和行为;
- 基于消息计算,有自己的Mailbox消息队列、消息处理回调函数;
- Actor 通过消息进行沟通
Actor的组成
隔离的环境
主要通过 Lua 虚拟机来实现;
消息队列
用来存放有序(先后到达)的消息;
消息处理回调函数
用来运行 Actor;从 Actor 的消息队列中取出消息,并作为该回调函数的参数来运行 Actor;
Actor 创建
skynet
启动服务流程
函数调用 | 源码路径 |
---|---|
skynet.newservice | lualib/skynet.lua |
command.LAUNCH | service/launcher.lua (以下都在launcher 服务中调用) |
skynet.launch | lualib/skynet/manager.lua |
lcommand | lualib-src/lua-skynet.c |
cmd_launch | skynet-src/skynet_server.c |
skynet_context_new | skynet-src/skynet_server.c |
Actor 底层关键接口
// 对于snlua服务来说,用于创建隔离的环境
void * skynet_module_instance_create(struct skynet_module *m);
// 用于设置初始回调函数和给自己发送初始化消息
int skynet_module_instance_init(struct skynet_module *m, void * inst, struct
skynet_context *ctx, const char * parm);
// 用于释放 Actor 对象
void skynet_module_instance_release(struct skynet_module *m, void *inst);
// 用于处理 信号 消息,比如中断死循环
void skynet_module_instance_signal(struct skynet_module *m, void *inst, int
signal);
Actor 运行
skynet.start
会设置Actor消息回调函数,一个消息执行时会获取一个协程执行它。设置ctx->cb
消息回调关键:
函数 | 源码路径 |
---|---|
skynet.start | skynet.lua |
c.callback(skynet.dispatch_message) | skynet.lua |
lcallback | lualib-src/lua-skynet.c |
skynet_callback | skynet-src/skynet_server.c |
一切从thread_worker
函数开始运行,每个消息都会有一个协程来执行它。
函数 | 源码路径 |
---|---|
thread_worker | skynet-src/skynet_start.c |
skynet_context_message_dispatch | skynet-src/skynet_server.c |
skynet_mq_pop | skynet-src/skynet_mq.c |
dispatch_message | skynet-src/skynet_server.c |
ctx->cb | skynet-src/skynet_server.c |
skynet.dispatch_message | skynet.lua 以Lua消息为例 |
skynet.raw_dispatch_message | skynet.lua |
co_create | skynet.lua |
skynet.dispatch 所注册的协程入口函数 | skynet.lua |
Lua虚拟机有一个限制,同时只有一个协程在运行。所以写代码时以单线程思维方式在组织代码。
内核线程取出消息队列,找到Lua虚拟机,从协程池取出一个协程来执行消息运行
Actor 消息
Actor 模型基于消息计算,在 Skynet 框架中,消息包含 Actor (之间)消息、网络消息以及定时消息;
Actor 之间消息
-- addr 对端服务的地址
-- typename 消息类型 actor内部间通常为 lua 类型消息
-- ... 为可变参
-- skynet.send是百分百会到达的, 异步消息,不等待对方反馈
skynet.send(addr, typename, ...)
-- addr 对端服务的地址
-- typename 消息类型 actor内部间通常为 lua 类型消息
-- ... 为可变参
-- skynet.call就是发起一次远程调用,调用者会被挂起,等待对方回应后唤醒并处理返回值
-- 注意:
-- 对端需要显示调用 skynet.ret(...) 回应 skynet.call 的请求
-- 或者通过调用 skynet.response() 延迟回应 skynet.call 的请求
skynet.call(addr, typename, ...)
网络消息
Skynet 当中采用一个 socket 线程来处理网络信息;Skynet 基于 reactor 网络模型;
问题:网络当中获取数据,怎么知道FD数据传递到哪个服务(Actor)的消息队列当中去?
// 在 linux 系统中,采用 epoll 来检测管理网络事件;
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
// epoll常见事件:EPOLLIN,EPOLLOUT,EPOLLHUP,EPOLLERR
// 注意右边的就绪队列是事件队列,epoll_wait是从就绪队列中取事件
通过epoll_ctl
设置struct epoll_event
中data.ptr = (struct socket *)ud;
来完成fd与Actor绑定。
有一个slot池,不和Redis一样用fd
来创建大池。通过定义域相对较小的id创建预分配socket池。Lua层只能访问到slot idx
来当作FD
Skynet的Lua逻辑通过socket.start(fd, func)
来完成Actor与FD的绑定
定时消息
定时器线程发送给Actor的消息。工作线程通过skynet_timeout
函数调用timer_add
函数在时间轮加入新定时器。当时间到达时,在dispatch_list
处理并调用skynet_context_push
给工作线程压入超时消息。
Skynet 采用多层级时间轮来解决多线程环境下定时任务的管理;时间复杂度为O(1) ;
当定时任务被触发,将会向目标 Actor 发送定时消息,从而驱动 Actor 的运行;
网络消息推送到 Actor
函数 | 源码 | 备注 |
---|---|---|
skynet_socket_poll | skynet-src/socket_server.c | socket线程循环主要工作 |
socket_server_poll | skynet-src/socket_server.c | 处理外部客户端和内部管道数据 |
forward_message | skynet-src/skynet_socket.c | 将消息推送到所属Actor消息队列 |
ctrl_cmd | skynet-src/socket_server.c | 处理work线程通过PIPE发送来数据 |
sp_wait | skynet-src/socket_epoll.h | 阻塞处理IO事件 |
report_connect | skynet-src/socket_server.c | 连接第三方服务 建立成功的标识 |
report_accept | skynet-src/socket_server.c | 接收到客户端的连接,在这里可以绑定不同的client对应不同的服务 |
forward_message_tcp | skynet-src/socket_server.c | 读事件 |
send_buffer | skynet-src/socket_server.c | 写事件,把写缓存区发送出去 |
Actor的调度
工作线程流程
全局单向队列和Actor消息队列都是先进先出,前者保证不饿死,后者保证顺序处理
工作线程从全局队列中 pop 出单个 Actor 消息队列;从 Actor 消息队列中按照规则 pop 出一定
数量的消息进行执行;若 Actor 消息队列中仍有消息继续放入全局队列队尾;若 Actor 消息队列中
没有消息则不放入全局队列中;全局队列只存活跃的 Actor 消息队列;非活跃的Actor不在全局队列中。
这个流程和Nginx线程池工作原理一致
工作线程权重
工作线程数量是按照 CPU 核心数来设置的;工作线程按照下面工作线程权重图来设置每个工作线
程的权重;
// 工作线程权重图,32核
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,// 前4个线程只消耗1个,后4个线程全部消耗
1, 1, 1, 1, 1, 1, 1, 1, // 一次消耗1/2个消息
2, 2, 2, 2, 2, 2, 2, 2, // 1/4
3, 3, 3, 3, 3, 3, 3, 3, }; // 1/8
工作线程执行规则
int i,n=1;
for (i=0; i<n; i++) {
// 注意: skynet_mq_pop pop出消息则返回0,没有pop消息返回1
if (skynet_mq_pop(q, &msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {// 在执行第1次时就计算出本轮会分发几个消息
n = skynet_mq_length(q);
n >>= weight;// n >> 0 = n , n >> 1 = n/2
}
...
// 调用 actor 回调函数消费消息
dispatch_message(ctx, &msg);
}
从上面逻辑可以看出,当工作线程的权重为 -1 时,该工作线程每次只 pop 一条消息;当工作线程的权重为 0 时,该工作线程每次消费完所有的消息(防止消息多的Actor频繁线程切换);当工作线程的权重为 1 时,每次消费消息队列中 1/2的消息;当工作线程的权重为 2 时,每次消费消息队列中1/4 的消息;以此类推;通过这种方式,完成消息队列梯度消费,从而不至于让某些队列过长;这种消息调度的方式不是最优的调度方式(相较于 go 语言,Go可以绑定线程等),云风也在尝试修改更优的方式来调度;但是目前从多年线上实践情况来看,Skynet 运行良好;
调度问题
关注锁的使用,参考多核并发编程当中Nginx实现
- 多个工作线程从全局消息队列中取次级消息队列,应该采用什么锁?自旋
- 当 Skynet 全局消息队列节点很少的时候,怎么让多余的工作线程得到休眠?互斥+条件变量
- 在问题 2 的基础上,如果此时全局消息队列节点很多后,怎么让休眠的工作线程得到唤醒?定时
Skynet多个虚拟机共享函数原型、字符串等,实现一个虚拟机200KB左右内存。
总体原理图
关于并发和并行
并行定义为一个时间点是有多个任务同时被处理。
并发定义为一个时间段内多个任务被处理了。类似于一个CPU通过切换任务可以达到在一个时间段内处理多个队列。Skynet框架并发体现在:
- 内部调度Actor时就是并发模型
- 在socket数据分发使用的reactor模型
- 多线程并行处理成千上万的Actor
评论 (0)