1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 /** * Created by chenhao on 2019/9/4. */ public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
本篇博文将从源码的角度分析ChannelFuture f = b.bind(8888).sync()
1. 源码分析ChannelFuture f = b.bind(8888).sync() AbstractBootstrap.java
1 2 3 public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
1 2 3 4 5 6 7 public ChannelFuture bind(SocketAddress localAddress) { validate();//相关参数的检查 if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress);//下面将分析 }
1.1 validate()方法 1 2 3 4 5 6 7 8 9 10 11 12 //函数功能:检查相关参数是否设置了 @SuppressWarnings("unchecked") public B validate() { if (group == null) {//这里的group指的是:b.group(bossGroup, workerGroup)代码中的bossGroup throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; }
1.2 doBind(localAddress)方法 doBind方法的源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
1.3 initAndRegister() 该方法的具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 final ChannelFuture initAndRegister() { //结论:这里的channel为一个NioServerSocketChannel对象,具体分析见后面 final Channel channel = channelFactory().newChannel();//1 try { init(channel);//2 } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel);//3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
1.3.1 final Channel channel = channelFactory().newChannel(); 在上一篇文章中分析中,我们知道b.channel(NioServerSocketChannel.class)的功能为:设置父类属性channelFactory 为: BootstrapChannelFactory类的对象。其中这里BootstrapChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class
因此,final Channel channel = channelFactory().newChannel();就是调用的BootstrapChannelFactory类中的newChannel()方法,该方法的具体内容为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; BootstrapChannelFactory(Class<? extends T> clazz) { this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }
1.3.2 NioServerSocketChannel构造器 下面将看下NioServerSocketChannel类的构造函数做了哪些工作。
1 2 3 public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
无参构造函数中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
1 2 3 4 5 6 7 8 9 10 11 12 private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } //父类AbstractNioMessageChannel的构造函数 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } //父类 AbstractNioChannel的构造函数 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT try { ch.configureBlocking(false);//设置当前的ServerSocketChannel为非阻塞的 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //父类AbstractChannel的构造函数 protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
new NioServerSocketChannel()产生一个实例对象时,调用上面这么多构造函数主要干了两件事情:
1 config = new NioServerSocketChannelConfig(this, javaChannel().socket()
1 this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
1 2 3 4 @Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
1 pipeline = new DefaultChannelPipeline(this);
这些属性在后面都会用到,至于NioServerSocketChannel 对象中的unsafe、pipeline属性的具体实现后面进行分析。
结论:**final Channel channel = channelFactory().newChannel();**这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例,其中这个NioServerSocketChannel类对象有这样几个属性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline
1.4 init(channel) init方法的具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override void init(Channel channel) throws Exception { //1、设置新接入channel的option final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options);//NioServerSocketChannelConfig } //2、设置新接入channel的attr final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //3、设置handler到pipeline上 ChannelPipeline p = channel.pipeline(); if (handler() != null) {//这里的handler()返回的就是第二部分.handler(new SimpleServerHandler())所设置的SimpleServerHandler p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //p.addLast()向serverChannel的流水线处理器中加入了一个ServerBootstrapAcceptor,从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
1 Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public <T> boolean setOption(ChannelOption<T> option, T value) { validate(option, value); if (option == CONNECT_TIMEOUT_MILLIS) { setConnectTimeoutMillis((Integer) value); } else if (option == MAX_MESSAGES_PER_READ) { setMaxMessagesPerRead((Integer) value); } else if (option == WRITE_SPIN_COUNT) { setWriteSpinCount((Integer) value); } else if (option == ALLOCATOR) { setAllocator((ByteBufAllocator) value); } else if (option == RCVBUF_ALLOCATOR) { setRecvByteBufAllocator((RecvByteBufAllocator) value); } else if (option == AUTO_READ) { setAutoRead((Boolean) value); } else if (option == AUTO_CLOSE) { setAutoClose((Boolean) value); } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) { setWriteBufferHighWaterMark((Integer) value); } else if (option == WRITE_BUFFER_LOW_WATER_MARK) { setWriteBufferLowWaterMark((Integer) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); } else { return false; } return true; }
1 private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
其中,这里的handler为:在博文中分析的通过b.handler(new SimpleServerHandler())
4、在pipeline上添加来一个ChannelInitializer对象,其中重写来initChannel方法。该方法通过p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor, 从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
1.5 group().register(channel) 回到 initAndRegister 方法中,继续看 config().group().register(channel) 这行代码,config 方法返回了 ServerBootstrapConfig,这个 ServerBootstrapConfig 调用了 group 方法,实际上就是 bossGroup。bossGroup 调用了 register 方法。
1 2 3 4 @Override public ChannelFuture register(Channel channel) { return next().register(channel);//调用了NioEvenLoop对象中的register方法,NioEventLoop extends SingleThreadEventLoop }
1 2 3 4 @Override public EventExecutor next() { return chooser.next();//调用MultithreadEventExecutorGroup中的next方法 }
根据线程个数nThreads是否为2的幂次方来选择chooser,其中这两个chooser为: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser,这两个chooser功能都是一样,只是求余的方式不一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1];//利用2的N次方法的特点,使用&求余更快。 } } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }
由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
1 2 3 4 @Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
channel.unsafe().register(this, promise)这行代码调用的是AbstractUnsafe类中的register方法,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } //判断该channel是否已经被注册到EventLoop中 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //1 将eventLoop设置在NioServerSocketChannel上 AbstractChannel.this.eventLoop = eventLoop; //判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,则添加一个任务到该线程中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { //重点 @Override public void run() { register0(promise);//分析 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
1 2 3 4 @Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
1 2 3 4 @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } doRegister(); registered = true; safeSetSuccess(promise); //执行完,控制台输出:channelRegistered pipeline.fireChannelRegistered(); if (isActive()) { //分析 pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } protected SelectableChannel javaChannel() { return ch; }
selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel注册到Selector中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //...省略了一部分代码 return selector; }
1 2 3 4 public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(this.head); return this; }
1 2 3 4 5 6 7 8 9 10 11 12 13 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
1 2 3 4 5 6 7 8 9 10 11 12 private void invokeChannelRegistered() { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRegistered(this); } catch (Throwable var2) { this.notifyHandlerException(var2); } } else { this.fireChannelRegistered(); } }
1 2 3 4 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); }
1 2 3 4 public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(this.findContextInbound()); return this; }
1 2 3 4 5 6 7 8 9 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while(!ctx.inbound); return ctx; }
我们看到 ctx = ctx.next; 实际上是从head开始找,找到第一个 inbound 的hander
1 2 3 4 5 6 7 8 9 10 11 12 13 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
pipeline中维护了handler链表,还记得之前.handler(new SimpleServerHandler())初始化的handler在本博文的第1.2部分的分析中介绍了此handler被添加到此pipeline中了,通过遍历链表,执行InBound类型handler的channelRegistered方法
到这里,我们就将doBind方法final ChannelFuture regFuture = initAndRegister();给分析完了,得到的结论如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
1.6 doBind0(regFuture, channel, localAddress, promise); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop();//判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,如果不是,则先启动线程,然后添加任务到任务队列中去 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); //如果 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
当提交的任务被线程执行后,则会执行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)这行代码,这行代码完成的功能为:实现channel与端口的绑定。
1 2 3 4 5 6 AbstractChannel.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
1 2 3 4 5 6 DefaultChannelPipeline.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 AbstractChannelHandlerContext.java @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //...省略有效性检查 final AbstractChannelHandlerContext next = findContextOutbound();// EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
此上面bind函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。
1 2 3 4 5 6 7 private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
在 DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾。 head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口和ChannelInboundHandler 接口, 并且它的 outbound 字段为 true.而tail 是 TailContext 的实例,它实现了ChannelInboundHandler 接口,并且其outbound 字段为 false,inbound 字段为true。 基于此在如上的bind函数中调用了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 对象其实就是 head.
1 2 3 4 5 6 7 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
1 2 3 4 @Override public ChannelHandler handler() { return this; }
1 2 3 4 5 6 @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
1 2 3 4 HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); }
1 unsafe = newUnsafe();//newUnsafe()方法返回的是NioMessageUnsafe对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //...省略了部分代码 boolean wasActive = isActive(); try { doBind(localAddress);//核心代码 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
1 2 3 4 @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
上面方法中javaChannel()方法返回的是NioServerSocketChannel实例初始化时所产生的Java NIO ServerSocketChannel实例(更具体点为ServerSocketChannelImple实例)。 等价于语句serverSocketChannel.socket().bind(localAddress)完成了指定端口的绑定,这样就开始监听此端口。绑定端口成功后,是这里调用了我们自定义handler的channelActive方法,在绑定之前,isActive()方法返回false,绑定之后返回true。
1 2 3 4 @Override public boolean isActive() { return javaChannel().socket().isBound(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } private void invokeLater(Runnable task) { try { //省略了部分代码 eventLoop().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }
进而开始执行 pipeline.fireChannelActive();这行代码 ,这行代码的具体调用链如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; } @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelActive(); } }); } return this; } private void invokeChannelActive() { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } }
2. 总结 最后,我们来做下总结,netty启动一个服务所经过的流程 1.设置启动类参数,最重要的就是设置channel 2.创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等 3.init 初始化这个 NioServerSocketChannel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并触发addHandler事件
4.config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 根据group长度取模得到NioEventLoop ,将 NioServerSocketChannel 注册到 NioEventLoop 中的 selector 上,然后触发 channelRegistered事件