Netty
是一个封装了JAVA NIO
的异步网络应用框架,它简化了我们网络编程,同时保证了高可用和高性能,因此我认为学习netty
对以后深入一些RPC框架,或者接触一些网络通信都大有裨益。
今天就是想从源码角度学习一下netty
的reactor
线程模型,这应该是netty
的核心之一,整篇文章包含的只是我作为一个初学者的粗浅见解,同时也借鉴了大量的博客文章,因此对部分内容认识不够深入,或者存在逻辑不通的情况,希望能够谅解。
Reactor简介
在接触netty
之前,首先先介绍一些基础知识,让我们了解一下什么是Reactor
模式
维基百科的解读如下
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
翻译总结主要有这么几点
- reactor是一种事件驱动的设计模式
- reactor可以同步地接受多个输入源
- reactor以多路复用的模式分发多个请求到对应的处理器上去
让我们拿实际情景做例子。
在传统BIO
中,一旦遇到多个事件请求并发,事件监听器只会将后面的线程阻塞,等到当前线程处理结束后才能继续进行监听接受后续请求,这样会大大降低吞吐量,增大系统的负载,在高并发的场景下容易造成明显的数据延迟。
因此在NIO
中将耗时的IO
处理操作和请求的监听接受分离开来,由一个线程专门监听事件请求,当接受一个请求后再开启一条线程专门处理IO
请求,从而主线程不阻塞,达到了非阻塞。
至此,Reactor
的基本架构已经出来了,在上述文字中已经隐含了Reactor
的三种基础角色:
- Reactor 将I/O事件分派给对应的Handler
- Acceptor 处理客户端新连接,并分派请求到处理器链中
- Handlers 执行非阻塞读/写 任务
经典的reactor
线程模型如图所示,
但是这只是最初级的架构,针对具体复杂的场景,还需要做出诸多优化,比如构建处理IO
请求的线程池、将监听事件和分发事件进一步解耦、进一步减少资源开销等,这些在这里就不再展开了。
netty
中核心的reactor
线程模型就是一个相对更成熟和高性能的模型,它的架构更像是下图,是一个多reactor
多线程的模型。
其中mainReactor
主要是用来处理网络IO
连接建立操作,通常一个线程就可以处理,而subReactor
主要做和建立起来的socket
做数据交互和事件业务处理操作,它的个数上一般是和CPU
个数等同,每个subReactor
一个线程来处理。
关于reactor
模式的暂时就介绍这么多了,其实关于事件的处理还有其他的处理模式。在Douglas Schmidt
的作品《POSA2》中提到了有四种事件处理模式:
- Reactor
- Proactor
- Asynchronous Completion Token
- Acceptor-Connector
这些以后有时间再研究,今天就先专注netty
的事件处理
Reacor线程的启动和创建
得益于netty
良好的封装,使得我们只要接触过netty
,相信都能直接感受其中reactor
模式的存在,下面给一个简单的netty
服务端的启动demo
(本人环境 netty-all-4.1.48 ):
1 | public class NettyServer { |
其中最重要的就是NioEventLoopGroup
,因为在这里我们创建了事件的监听和分发器。那什么是NioEventLoopGroup
呢?通过词义可以看到词根是EventLoop
, 在Js
中我们知道Eventloop
就是Js
的运行机制,是通过单线程循环调度处理事件,这其实也和reactor
模式中用单线程监听接受事件非常相似,那再netty
中是否就是这个意思呢?让我们先往下看
NioEventLoopGroup
因此,现在就让我们扒一扒NioEventLoopGroup
的源码,看看里面做了什么。首先进入NioEventLoopGroup
,发现它继承了MultithreadEventLoopGroup
,先不急看父类,看一下它的构造函数:
1 | public class NioEventLoopGroup extends MultithreadEventLoopGroup { |
观察一下其中的传入参数,其中大部分参数都被设置了默认值
nThreads
:表示线程池中的线程数,注意这里的线程并非是我们常见的线程,大家先这个thread看成是某个实体,接下来会揭晓这个实体是什么executor
: 传入的线程池实例,这个暂时不表,只要知道是给上述那个实体用的selectorProvider
: 我们需要通过它来实例化 JDK 的Selector
,也就是说那个实体持有selectorselectStrategyFactory
:关于selector
如何select
的策略,之后等揭晓了这个实体之后会做出解答
其他参数
chooserFactory
: 选择从线程池中选择线程(那个实体)的策略rejectedExecutionHandler
: 线程池中出现拒绝执行时的策略,netty
的默认拒绝策略是抛出异常
接下里我们进入父类查看构造方法
1 | private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); |
继续进入父类查看
1 | //进入下面的构造方法 |
到这里MultithreadEventExecutorGroup
会先设置一个ThreadPerTaskExecutor
,就是上面所说的executor
再设置一个chooserFactory
,用来实现在线程池中选择线程的选择策略,这部分代码比较简单,就不贴出来了
接着this()
就会进入正式配置的构造方法
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { |
接着进入上面的newChild()
方法,实际上是进入的NioEventLoop
的方法
1 | protected EventLoop newChild(Executor executor, Object... args) throws Exception { |
至此可以揭晓谜底了,线程池中的线程,那个实体指的就是NioEventLoop
!所以我们整段代码看下来,可以得出结论,NioEventLoopGroup
只是一个创建NioEventLoop
的实体池,而很多构造方法里的参数最终也是要进入NioEventLoop
进行使用。
其实我们看词义和继承关系也能看出个大概,NioEventLoop
继承了MultithreadEventLoopGroup
,而NioEventLoop
继承了SingleThreadEventLoop
,也是多和一的关系。
NioEventLoop
那接下来我们就要查看NioEventLoop
的源码,康康里面究竟卖着什么葫芦药
首先看看它的构造方法
1 |
|
发现了很多老面孔了,这些都是直接从外层传递进来的,但是也有一些比较重要的东西,比如selector
,这是reactor
的重要角色
现在我们其实对NioEventLoop
的具体工作流程尚不了解,但是通过对构造函数的分析,相信大家也都明白了这个实体类的重要性。这里做一下总结,接下来会继续分析NioEventLoop
的具体工作
- 从构造上可以看出,
NioEventLoop
实际上是一个线程池,只不过是一个单线程的线程池,里面持有一个Selector
,负责Reactor
中最重要的角色 - 我们利用
NioEventLoopGroup
构建NioEventLoop
,NioEventLoopGroup
是一个NioEventLoop
池,默认创建2*核心数
个NioEventLoop
,同时,客户端创建一个NioEventLoopGroup
,服务端创建两个NioEventLoopGroup
,实际上是多reactor模型,对事件的接受和分发做解耦,这个之后回顾整体架构的时候再细说
执行
在netty
服务端的demo
启动后,代码会执行到NioEventLoop
的run
方法,让我们直接进入run
方法查看eventloop
是如何工作的:
1 | protected void run() { |
重点关注其中的一个 switch(strategy)
,一个 if (ioRatio == 100)
这里其实主要涉及到三个操作、
select()
:轮询,接受注册到reactor
线程上的事件processSelectedKeys()
:处理产生网络IO
事件的channel
runAllTasks()
:处理任务队列
这三个操作就是NioEventLoop
的核心操作,实际上就包含了channel
的接受,分发和处理,下面就分别介绍一下这三个操作
select()
截取run方法中的一部分
1 | switch(strategy) { |
这里主要关注select
分支,首先要注意的是select
是一个阻塞方法,返回值表示多少channel
准备就绪,可以进入处理。
看一看select
前后处理逻辑,首先设置当前最迟的轮询时间,然后进入select
方法
1 | private int select(long deadlineNanos) throws IOException { |
netty
里面定时任务队列是按照延迟时间从小到大进行排序,每次获取任务都是从延迟时间最小的开始获取。select
就是通过判断是否有任务正在排队来决定是否进行select
这里,如果时间小于0.995s
,表示截止事件时间快到了,那就立刻select
一次,调用selectNow()
方法。
其中还设置了下一次唤醒时间,如果截止时间到了但是任务还没有被处理,就会自动唤醒
·
processSelectedKeys()
让我们进入processSelectedKeys()方法看看里面干了什么
1 | private void processSelectedKeys() { |
我们知道SelectionKey
代表事件返回的对象,里面有感兴趣事件集合,准备就绪的事件集合,channel
,selector
,attachment
等
优化过的SelectionKey
和正常的SelectionKey
的区别在于netty
重写了SelectedSelectionKeySet
的selectedKeys
属性和publicSelectedKeys
属性,使得原来的set变成数组,降低了修改元素的事件复杂度。
看一下processSelectedKeysOptimized()
的代码:
1 | private void processSelectedKeysOptimized() { |
什么时候需要再次轮询?
- 在
channel
从selector
上移除的时候,调用cancel
函数将key
取消,并且当被去掉的key
到达CLEANUP_INTERVAL
的时候,设置needsToSelectAgain
为true
,CLEANUP_INTERVAL
默认值为256
- 因此每满
256
次会将selectedKeys
的内部数组全部清空,然后重新selectAgain
重新装载selectionKey
runTasks()
先看run
方法的代码片段
1 | ++selectCnt; |
- 如果
ioRatio
为100
同时就直接执行IO
操作,并最后处理返回的任务 - 如果
ioRatio
不是100
就计算io
处理限制时间,设置处理返回任务的截止时间,让处理I/O
事件的时间和执行任务的时间为1:1。
再看runAllTasks
的实现之前,先要明白task
究竟是什么。在netty
中一共有两种任务,一种是普通任务,一种是定时任务。这里先抛出结论:普通任务存放在taskQueue
中,定时任务存放在PriorityQueue
中,这部分的代码追踪暂时就不写出来了,让我们首先关注如何去运行致谢tasks
吧
进入runAllTasks()
看一下:
1 | protected boolean runAllTasks(long timeoutNanos) { |
整体逻辑还算简单,主要先看看fetchFromScheduledTaskQueue()
里面做了什么
1 | private boolean fetchFromScheduledTaskQueue() { |
之前说过task
包含普通任务和定时任务,分别处在不同的队列中。可见这个函数的主要作用就是将定时任务添加到普通队列中去,这样后面处理任务的时候就可以统一一个队列中获取。
为什么是放在普通队列中?因为定时队列是按照时间优先级的顺序排列,普通任务也无法正常的插入定时任务队列,反之快到截止时间的定时任务是可以看作是普通任务来处理的。
到这里runTasks
的逻辑也基本讲完了。总结一下主要干了这些事
- 协调
IO
时间和处理任务的时间,计算处理任务需要的deadline
截止时间 - 聚合定时任务和普通任务,循环取出任务执行
- 每执行
64
次任务就检查一次截止时间,到期退出循环 - 在
afterRunningAllTasks()
中,做一些收尾动作
总结
至此,EventLoop
上的所有任务都开始执行了,整个流程也结束了。·
最后让我们看看reactor
模式及对应下各个角色的具体实现,让我们再看一眼
Reactor
线程模型的主要实现实体:NioEventLoop
client
: 请求的事件,可以理解是一个channel
acceptor
: selector
,主要指selector
的select()
负责的监听并接受事件
mainReactor
和sunReactor
:在服务端会绑定两个NioEventLoopGroup
:parentGroup
和childGroup
,一般在parentGroup
中处理accept
事件,再childGroup
中处理其他事件。IO
事件的处理主要是在processSelectedKeys()
函数里,判断SelectionKey
的类型,并交给不同的函数处理。
queued tasks
:当建立了连接后,待处理任务就会进入tasks queue
,等到再runTasks()
中循环队列处理,每个任务对应从threadpool
中取出一条worker thread
进行处理