NIO源码分析之Selector
NIO的Selector源码第一句话就是 A multiplexor of {@link SelectableChannel} objects.
即 SelectableChannel对象的多路复用器。这很清楚的说明了Selector的作用。
这篇文章主要从以下几个点对Selector进行分析:
- 选择器(Selector)
- 可选择通道(SelectableChannel)
- 选择键(SelectionKey)
- Selector完整实例
Selector
Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
下面是使用Selector管理多个channel的结构图:
- Selector的创建
A selector may be created by invoking the open method of this class, which will use the system’s default selector provider to create a new selector. A selector may also be created by invoking the openSelector method of a custom selector provider. A selector remains open until it is closed via its close method.
可以通过调用此类的open方法来创建选择器,该方法将使用系统的默认选择器提供程序来创建新的选择器。 还可以通过调用自定义选择器提供程序的openSelector方法来创建选择器。 选择器保持打开状态,直到通过其关闭方法关闭。
Selector可以调用静态方法open()来创建Selector
1 | Selector Selector=Selector.open(); |
openSelector() 是通过系统的默认获取
1 | java.nio.channels.spi.SelectorProvider public static SelectorProvider provider() |
为provider的类。
由以上的源码可见它加了锁,是线程安全的
SelectableChannel
SelectableChannel这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。因为FileChannel类没有继承SelectableChannel因此是不是可选通道,而所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,那种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
源码上对它的介绍:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17A channel that can be multiplexed via a Selector.
In order to be used with a selector, an instance of this class must first be registered via the register method. This method returns a new SelectionKey object that represents the channel's registration with the selector.
Once registered with a selector, a channel remains registered until it is deregistered. This involves deallocating whatever resources were allocated to the channel by the selector.
A channel cannot be deregistered directly; instead, the key representing its registration must be cancelled. Cancelling a key requests that the channel be deregistered during the selector's next selection operation. A key may be cancelled explicitly by invoking its cancel method. All of a channel's keys are cancelled implicitly when the channel is closed, whether by invoking its close method or by interrupting a thread blocked in an I/O operation upon the channel.
If the selector itself is closed then the channel will be deregistered, and the key representing its registration will be invalidated, without further delay.
A channel may be registered at most once with any particular selector.
Whether or not a channel is registered with one or more selectors may be determined by invoking the isRegistered method.
Selectable channels are safe for use by multiple concurrent threads.
可以通过选择器进行多路复用的通道。
为了与选择器一起使用,必须首先通过register方法注册该类的实例。此方法返回一个新的SelectionKey对象,该对象表示通道与选择器的注册。
一旦注册选择器,通道将保持注册状态,直到它被注销。这涉及解除分配选择器分配给通道的任何资源。
渠道不能直接注销;相反,必须取消代表其注册的密钥。取消密钥请求在选择器的下一个选择操作期间取消注册该通道。可以通过调用其cancel方法显式取消密钥。当通道关闭时,无论是通过调用其close方法还是通过中断在通道上的I / O操作中阻塞的线程,所有通道的键都会被隐式取消。
如果选择器本身已关闭,则将取消注册该通道,并且表示其注册的密钥将无效,而不会有进一步的延迟。
一个频道最多可以与任何特定选择器一起注册一次。
可以通过调用isRegistered方法来确定是否向一个或多个选择器注册了频道。
多个并发线程可以安全地使用可选择的通道。
SelectableChannel中有两个个方法为register,但是其实是一个方法
因为底层实现都是一样的,只不过第一个方法调用时第三个参数设置为null。
它的抽象类AbstractSelectableChannel实现了这个抽象方法,注释为
1 | 使用给定的选择器注册此通道,返回选择键。 |
要实现Selector管理Channel,需要将channel注册到相应的Selector上,如下:
1 | channel.configureBlocking(false); |
通过调用通道的register()方法会将它注册到一个选择器上。与Selector一起使用时,Channel必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式,而套接字通道都可以。另外通道一旦被注册,将不能再回到阻塞状态,此时若调用通道的configureBlocking(true)将抛出BlockingModeException异常。
register()方法的第二个参数是“interest集合”,表示选择器所关心的通道操作,它实际上是一个表示选择器在检查通道就绪状态时需要关心的操作的比特掩码。比如一个选择器对通道的read和write操作感兴趣,那么选择器在检查该通道时,只会检查通道的read和write操作是否已经处在就绪状态。
它有以下四种操作类型:
Connect 连接
Accept 接受
Read 读
Write 写
需要注意并非所有的操作在所有的可选择通道上都能被支持,比如ServerSocketChannel支持Accept,而SocketChannel中不支持。我们可以通过通道上的validOps()方法来获取特定通道下所有支持的操作集合。
JAVA中定义了四个常量来表示这四种操作类型:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果Selector对通道的多操作类型感兴趣,可以用“位或”操作符来实现:
int interestSet=SelectionKey.OP_READ|SelectionKey.OP_WRITE;
当通道触发了某个操作之后,表示该通道的某个操作已经就绪,可以被操作。因此,某个SocketChannel成功连接到另一个服务器称为“连接就绪”(OP_CONNECT)。一个ServerSocketChannel准备好接收新进入的连接称为“接收就绪”(OP_ACCEPT)。一个有数据可读的通道可以说是“读就绪”(OP_READ)。等待写数据的通道可以说是“写就绪”(OP_WRITE)。
我们注意到register()方法会返回一个SelectionKey对象,我们称之为键对象。下面会对键对象详细说明。
ServerSocketChannel
源码上的介绍:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23A selectable channel for stream-oriented listening sockets.
A server-socket channel is created by invoking the open method of this class. It is not possible to create a channel for an arbitrary, pre-existing ServerSocket. A newly-created server-socket channel is open but not yet bound. An attempt to invoke the accept method of an unbound server-socket channel will cause a NotYetBoundException to be thrown. A server-socket channel can be bound by invoking one of the bind methods defined by this class.
Socket options are configured using the setOption method. Server-socket channels support the following options:
Option Name
Description
SO_RCVBUF
The size of the socket receive buffer
SO_REUSEADDR
Re-use address
Additional (implementation specific) options may also be supported.
Server-socket channels are safe for use by multiple concurrent threads.
面向流的侦听套接字的可选通道。
通过调用此类的open方法创建服务器套接字通道。 无法为任意预先存在的ServerSocket创建通道。 新创建的服务器套接字通道已打开但尚未绑定。 尝试调用未绑定的服务器套接字通道的accept方法将导致抛出NotYetBoundException。 可以通过调用此类定义的绑定方法之一来绑定服务器套接字通道。
使用setOption方法配置套接字选项。 服务器套接字通道支持以下选项:
选项名称
描述
SO_RCVBUF
套接字接收缓冲区的大小
SO_REUSEADDR
重复使用地址
还可以支持其他(特定于实现的)选项。
服务器套接字通道可供多个并发线程使用。ServerSocketChannel继承了AbstractSelectableChannel
AbstractSelectableChannel继承了SelectableChannel,所以,ServerSocketChannel注册到Selector上
SocketChannel
源码上的介绍:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43A selectable channel for stream-oriented connecting sockets.
A socket channel is created by invoking one of the open methods of this class. It is not possible to create a channel for an arbitrary, pre-existing socket. A newly-created socket channel is open but not yet connected. An attempt to invoke an I/O operation upon an unconnected channel will cause a NotYetConnectedException to be thrown. A socket channel can be connected by invoking its connect method; once connected, a socket channel remains connected until it is closed. Whether or not a socket channel is connected may be determined by invoking its isConnected method.
Socket channels support non-blocking connection: A socket channel may be created and the process of establishing the link to the remote socket may be initiated via the connect method for later completion by the finishConnect method. Whether or not a connection operation is in progress may be determined by invoking the isConnectionPending method.
Socket channels support asynchronous shutdown, which is similar to the asynchronous close operation specified in the Channel class. If the input side of a socket is shut down by one thread while another thread is blocked in a read operation on the socket's channel, then the read operation in the blocked thread will complete without reading any bytes and will return -1. If the output side of a socket is shut down by one thread while another thread is blocked in a write operation on the socket's channel, then the blocked thread will receive an AsynchronousCloseException.
Socket options are configured using the setOption method. Socket channels support the following options:
Option Name
Description
SO_SNDBUF
The size of the socket send buffer
SO_RCVBUF
The size of the socket receive buffer
SO_KEEPALIVE
Keep connection alive
SO_REUSEADDR
Re-use address
SO_LINGER
Linger on close if data is present (when configured in blocking mode only)
TCP_NODELAY
Disable the Nagle algorithm
Additional (implementation specific) options may also be supported.
Socket channels are safe for use by multiple concurrent threads. They support concurrent reading and writing, though at most one thread may be reading and at most one thread may be writing at any given time. The connect and finishConnect methods are mutually synchronized against each other, and an attempt to initiate a read or write operation while an invocation of one of these methods is in progress will block until that invocation is complete.
用于面向流的连接套接字的可选通道。
通过调用此类的一个打开方法来创建套接字通道。无法为任意预先存在的套接字创建通道。新创建的套接字通道已打开但尚未连接。尝试在未连接的通道上调用I / O操作将导致抛出NotYetConnectedException。可以通过调用connect方法连接套接字通道;连接后,插座通道保持连接状态,直到它关闭。是否连接套接字通道可以通过调用其isConnected方法来确定。
套接字通道支持非阻塞连接:可以创建套接字通道,并且可以通过connect方法启动建立到远程套接字的链接的过程,以便稍后通过finishConnect方法完成。可以通过调用isConnectionPending方法来确定连接操作是否正在进行。
套接字通道支持异步关闭,这类似于Channel类中指定的异步关闭操作。如果套接字的输入端被一个线程关闭而另一个线程在套接字通道上的读操作中被阻塞,那么被阻塞线程中的读操作将完成而不读取任何字节并返回-1。如果套接字的输出端被一个线程关闭而另一个线程在套接字通道上的写操作中被阻塞,则被阻塞的线程将收到AsynchronousCloseException。
使用setOption方法配置套接字选项。套接字通道支持以下选项:
选项名称
描述
SO_SNDBUF
套接字发送缓冲区的大小
SO_RCVBUF
套接字接收缓冲区的大小
SO_KEEPALIVE
保持连接活着
SO_REUSEADDR
重复使用地址
SO_LINGER
如果存在数据则关闭(仅在阻止模式下配置时)
TCP_NODELAY
禁用Nagle算法
还可以支持其他(特定于实现的)选项。
套接字通道可以安全地由多个并发线程使用。它们支持并发读写,但最多只有一个线程可能正在读取,并且最多一个线程可能在任何给定时间写入。 connect和finishConnect方法彼此相互同步,并且在调用其中一个方法时尝试启动读取或写入操作将阻塞,直到该调用完成。
SocketChannel 继承了AbstractSelectableChannel,同ServerSocketChannel一样,可以注册到Selector。
SelectionKey
选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。
源码中这样说:
1 | A token representing the registration of a SelectableChannel with a Selector. |
下图格式较为清晰:
可见interestOps() 和 readyOps()都是返回的int类型,其实他们返回的是上面OP_READ等四个常量的 “|”。
判断集合中是否有某一个常量,可以使用“&”来判断,如
1 | (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT |
返回值为true则代表存在。
interest集合是Selector感兴趣的集合,用于指示选择器对通道关心的操作,可通过SelectionKey对象的interestOps()获取。最初,该兴趣集合是通道被注册到Selector时传进来的值。该集合不会被选择器改变,但是可通过interestOps()改变。
我们可以通过以下方法来判断Selector是否对Channel的某种事件感兴趣:
1 | int interestSet=selectionKey.interestOps(); |
ready 集合是通道已经就绪的操作的集合,表示一个通道准备好要执行的操作了,可通过SelctionKey对象的readyOps()来获取相关通道已经就绪的操作。它是interest集合的子集,并且表示了interest集合中从上次调用select()以后已经就绪的那些操作。(比如选择器对通道的ready,write操作感兴趣,而某时刻通道的read操作已经准备就绪可以被选择器获知了,前一种就是interest集合,后一种则是ready集合。)。JAVA中定义以下几个方法用来检查这些操作是否就绪:
1 | //int readSet=selectionKey.readOps(); |
需要注意的是,通过相关的选择键的readyOps()方法返回的就绪状态指示只是一个提示,底层的通道在任何时候都会不断改变,而其他线程也可能在通道上执行操作并影响到它的就绪状态。另外,我们不能直接修改ready集合。
取出SelectionKey所关联的Selector和Channel
通过SelectionKey访问对应的Selector和Channel:
1 | Channel channel =selectionKey.channel(); |
关于取消SelectionKey对象的那点事
我们可以通过SelectionKey对象的cancel()方法来取消特定的注册关系。
该方法调用之后,该SelectionKey对象将会被”拷贝”至已取消键的集合中,该键此时已经失效,但是该注册关系并不会立刻终结。在下一次select()时,已取消键的集合中的元素会被清除,相应的注册关系也真正终结。
为SelectionKey绑定附加对象
可以将一个或者多个附加对象绑定到SelectionKey上,以便容易的识别给定的通道。通常有两种方式:
在注册的时候直接绑定:
1
SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);
在绑定完成之后附加:
1
selectionKey.attach(theObject);//绑定
绑定之后,可通过对应的SelectionKey取出该对象:
selectionKey.attachment();。
如果要取消该对象,则可以通过该种方式:
selectionKey.attach(null).
需要注意的是如果附加的对象不再使用,一定要人为清除,因为垃圾回收器不会回收该对象,若不清除的话会成内存泄漏。
一个单独的通道可被注册到多个选择器中,有些时候我们需要通过isRegistered()方法来检查一个通道是否已经被注册到任何一个选择器上。 通常来说,我们并不会这么做。
- 通过Selector选择通道
我们知道选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中。接下来我们简单的了解一下Selector维护的三种类型SelectionKey集合:
我们知道选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中。接下来我们简单的了解一下
Selector维护的三种类型SelectionKey集合:(重点)
源码中已经说明了一切:
1 | A multiplexor of SelectableChannel objects. |
- 密钥集(key set)包含表示此选择器的当前通道注册的键。该方法由keys方法返回。
- 所选择的密钥集(selected-key set)是一组密钥,使得检测到每个密钥的信道准备好用于在先前选择操作期间在密钥的兴趣集中识别的至少一个操作。这个集由selectedKeys方法返回。选定键集始终是键集的子集。
- 取消密钥集( cancelled-key set)是已取消但其通道尚未取消注册的密钥集。此套装无法直接访问。取消密钥集始终是密钥集的子集。
在刚初始化的Selector对象中,这三个集合都是空的。通过Selector的select()方法可以选择已经准备就绪的通道(这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:
select():阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout):和select()一样,但最长阻塞事件为timeout毫秒。
selectNow():非阻塞,只要有通道就绪就立刻返回。
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合。如下:
1 | Set selectedKeys=selector.selectedKeys(); |
进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:
1 | Set selectedKeys = selector.selectedKeys(); |
关于Selector执行选择的过程(重点)
我们知道调用select()方法进行通道,现在我们再来深入一下选择的过程,也就是select()执行过程。当select()被调用时将执行以下几步:
首先检查已取消键集合,也就是通过cancle()取消的键。如果该集合不为空,则清空该集合里的键,同时该集合中每个取消的键也将从已注册键集合和已选择键集合中移除。(注意:一个键被取消时,并不会立刻从集合中移除,而是将该键“拷贝”至已取消键集合中,这种取消策略就是我们常提到的“延迟取消”。)
再次检查已注册键集合(准确说是该集合中每个键的interest集合)。系统底层会依次询问每个已经注册的通道是否准备好选择器所感兴趣的某种操作,一旦发现某个通道已经就绪了,则会首先判断该通道是否已经存在在已选择键集合当中,如果已经存在,则更新该通道在已注册键集合中对应的键的ready集合,如果不存在,则首先清空该通道的对应的键的ready集合,然后重设ready集合,最后将该键存至已注册键集合中。这里需要明白,当更新ready集合时,在上次select()中已经就绪的操作不会被删除,也就是ready集合中的元素是累积的,比如在第一次的selector对某个通道的read和write操作感兴趣,在第一次执行select()时,该通道的read操作就绪,此时该通道对应的键中的ready集合存有read元素,在第二次执行select()时,该通道的write操作也就绪了,此时该通道对应的ready集合中将同时有read和write元素。
深入已注册键集合的管理(重点)
由上面贴出的源码可以知道。通过选择操作将键添加到选定键集。可以通过调用set的remove方法或通过调用从set中获取的迭代器的remove方法,直接从selected-key集中删除键。密钥永远不会以任何其他方式从选定密钥集中删除;特别是,它们不会作为选择操作的副作用而被删除。密钥可能无法直接添加到选定密钥集。
首先要记住:选择器不会主动删除被添加到已选择键集合中的键,而且被添加到已选择键集合中的键的ready集合只能被设置,而不能被清理。如果我们希望清空已选择键集合中某个键的ready集合该怎么办?我们知道一个键在新加入已选择键集合之前会首先置空该键的ready集合,这样的话我们可以人为的将某个键从已注册键集合中移除最终实现置空某个键的ready集合。被移除的键如果在下一次的select()中再次就绪,它将会重新被添加到已选择的键的集合中。这就是为什么要在每次迭代的末尾调用
1 | keyIterator.remove() |
停止选择
选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。
通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回
该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
通过close()方法关闭Selector
该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。
调用interrupt()
调用该方法会使睡眠的线程抛出InterruptException异常,捕获该异常并在调用wakeup()
Selector完整实例
这里我们结合ServerSocketChannel和Selector构建简单的服务器,下面是完整的代码示例。
服务端代码:
1 | public class ServerSocketChannelTest { |
客户端代码:
1 | public class SocketChannelTest { |
本文参考了:https://blog.csdn.net/dd864140130/article/details/50299687,修正了其中的错误,并以源代码为依据,重新梳理了整个Selector。整个过程花费时间长,收获也很多,特别是对Selector,SelectorKey有了更深一个层次的了解。