在之前的文章当中,我们分析了ServerBootstrap.bind当中究竟干了些什么,并且找到了netty当中使用java nio的具体地方。今天的内容,重点讨论一下,当一个客户端要connect我们的server的时候,netty究竟具体做了哪些事情。
先看一下java nio的代码,这个当中做了这么几件事情,
selector.select -> 遍历 selector.selectedKeys() -> 判断 key.isAcceptable() -> accept并生成一个 SocketChannel 对象 -> config and register
if(selector.select(TimeOut)==0){
System.out.println("."); continue; } // 获得就绪信道的键迭代器 Iterator<SelectionKey> keyIter=selector.selectedKeys().iterator(); // 使用迭代器进行遍历就绪信道 while(keyIter.hasNext()){ System.out.println("Something happened "); SelectionKey key=keyIter.next(); // 这种情况是有客户端连接过来,准备一个clientChannel与之通信 if(key.isAcceptable()){ System.out.println("Accept in"); SocketChannel clientChannel=((ServerSocketChannel)key.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocate(Buffer_Size)); }那么我们现在就看一下netty的代码,找一找在新连接到来的时候,netty具体做了些什么事情,而java nio又是具体在哪些地方出现的。
首先我们搞清楚一个问题,我们都知道每一个java对象,都会分配在heap当中,而如果它的引用的生命周期已经结束的情况下,它会在某一个时间点被jvm回收。那么之前在bind过程当中产生的一个NioServerSocketChannel对象,如果我们在回顾一下bind部分的代码,我们会发现,它的引用,统统都是在方法内部,以局部变量的形式存在的,在bind方法执行完毕之后,这个 NioServerSocketChannel 对象,又以什么形式出现呢?
我得出的结论是这样的,在 NioServerSocketChannel 生成之后,会为它分配一个NioEventLoop对象,之后会把 NioServerSocketChannel 注册到这个 NioEventLoop对象 当中的selector上,这样的话,相当于是 NioEventLoop对象 引用了这个NioServerSocketChannel 对象,而 NioEventLoop 是一直处于alive状态的。
那么下一个问题, NioEventLoop究竟是什么时候开始运行的呢?
我们找到了这个地方AbstractChannel$AbstractUnsafe.register这个方法,该方法用于register,看代码,首先判断当前eventLoop是否处在运行当中,如果不是的话,执行了它的execute方法。这个execute方法有2层含义,1、启动线程,2、添加一个task到该线程的task队列当中。(这部分的具体实现,我们以后再看)
if (eventLoop.inEventLoop()) {
register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("Tmp Task run"); register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } }ok,到此为止,NioEventLoop已经处在运行状态了,那么我们去看看run当中都干了些什么。
在run方法当中,我们总结一下,其实就是2件事,1、我的本职工作,监控server channel的状态,发起下一步动作;2、完成临时添加的其他task。大家按照这个思路来看,就很容易理解了。之前的文章当中我们分析过,通过java reflect机制,已经成功的把selector的selectedkeys属性映射到了NioEventLoop的 selectedKeys变量当中了。protected void run() {
for (;;) { System.out.println("NioEventLoop run start"); oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select();if (wakenUp.get()) {
selector.wakeup(); } }cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime;final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);if (isShuttingDown()) {
closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }我们直接来看这段 processSelectedKeysOptimized(selectedKeys.flip()); 这个方法当中,会遍历 selectedKeys ,然后执行相应的动作,对于一个server channel来说,我们期待的就是一个accept。看看代码。请注意红色部分,对于我们目前的场景,只关注if部分即可。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; }final Object a = k.attachment();
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
if (needsToSelectAgain) {
selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }之后关注 processSelectedKey(k, (AbstractNioChannel) a)的代码,还是关注红色部分,在有连接进来的时候,我们debug一下就可以知道, readyOps =16,对照一下selectionKey.OP_ACCEPT,我们知道这个时候需要server channel accept一个新的连接了。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }try {
int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);unsafe.finishConnect();
} } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }那么我们接下来看一下unsafe究竟是谁?而unsafe.read究竟干了些什么。首先我们看一下read方法的实现,来自2个class,NioMessageUnsafe和NioByteUnsafe,初步了解 NioMessageUnsafe 服务于NioServerSocketChannel,而 NioByteUnsafe 服务于NioSocketChannel,更详细的信息,我们以后再研究。找到unsafe.read的代码,AbstractNioMessageChannel$NioMessageUnsafe.read(),代码很多,我们仅关注红色部分。
public void read() {
assert eventLoop().inEventLoop(); if (!config().isAutoRead()) { removeReadOp(); }final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final boolean autoRead = config.isAutoRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { for (;;) { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; }if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
break; } } } catch (Throwable t) { exception = t; }int size = readBuf.size();
for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete();if (exception != null) {
if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); }pipeline.fireExceptionCaught(exception);
}if (closed) {
if (isOpen()) { close(voidPromise()); } } }首先是 doReadMessages , 代码在NioServerSocketChannel当中,很明确,accept并生成一个 java nio的Socketchannel对象 ,然后new一个 NioSocketChannel 对象,把 NioServerSocketChannel 自己、一个eventloop、还有一个java nio的Socketchannel对象,放到了这个新的对象当中。之后加入到list当中,并返回1.
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();try {
if (ch != null) { buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);try {
ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }return 0;
}然后是 pipeline.fireChannelRead(readBuf.get(i));注意2点,首先这里的pipeline是服务于Server channel的pipeline,通常情况下,当中并没有我们业务相关的handler,而只有一个netty默认为server channel服务的ServerBootstrapAcceptor。其次,传入的参数就是新的NioSocketChannel;我们看一下它的channelRead的代码,为新Channel添加handler,设置属性、register selector.
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } }for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }child.unsafe().register(child.newPromise());
}在之后,fire一个 ChannelReadComplete,这个当中涉及到autoread这个参数,我们会在后续详细讨论。
到此为止,新的channel已经生成,并且已经成功注册到了eventloop上。而在register的同时,新channel的eventloop已经开始运行了。
下面就等着客户端发消息过来了!