正文:

本文接着前两篇文章来讲,主要讲服务端类剩下的部分,我们还是来先看看服务端的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Created by chenhao on 2019/9/4.
*/
public final class SimpleServer {

public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new SimpleServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture f = b.bind(8888).sync();

f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

在前面两篇博文中从源码的角度分析了如下几行代码主要做了哪些工作。

1
2
3
4
5
6
7
8
9
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new SimpleServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

本篇博文将从源码的角度分析ChannelFuture f = b.bind(8888).sync() 的内部实现。这样就完成了Netty服务器端启动过程的源码分析。

1. 源码分析ChannelFuture f = b.bind(8888).sync()

AbstractBootstrap.java

1
2
3
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}

我们接着看重载的bind

1
2
3
4
5
6
7
public ChannelFuture bind(SocketAddress localAddress) {
validate();//相关参数的检查
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);//下面将分析
}

该函数主要看两点:validate()和doBind(localAddress)

1.1 validate()方法

1
2
3
4
5
6
7
8
9
10
11
12
//函数功能:检查相关参数是否设置了
@SuppressWarnings("unchecked")
public B validate() {
if (group == null) {//这里的group指的是:b.group(bossGroup, workerGroup)代码中的bossGroup
throw new IllegalStateException("group not set");
}

if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}

该方法主要检查了两个参数,一个是group,一个是channelFactory,在这里可以想一想这两个参数是在哪里以及何时被赋值的?答案是在如下代码块中被赋值的,其中是将bossGroup赋值给了group,将BootstrapChannelFactory赋值给了channelFactory.

1
2
3
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)

1.2 doBind(localAddress)方法

doBind方法的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1
final Channel channel = regFuture.channel();//2
if (regFuture.cause() != null) {
return regFuture;
}

final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}

return promise;
}

doBind这个函数是我们要分析的重点,这个函数的主要工作有如下几点:

1、通过initAndRegister()方法得到一个ChannelFuture的实例regFuture。

2、通过regFuture.cause()方法判断是否在执行initAndRegister方法时产生来异常。如果产生来异常,则直接返回,如果没有产生异常则进行第3步。

3、通过regFuture.isDone()来判断initAndRegister方法是否执行完毕,如果执行完毕来返回true,然后调用doBind0进行socket绑定。如果没有执行完毕则返回false进行第4步。

4、regFuture会添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。

第3、4点想干的事就是一个:调用doBind0方法进行socket绑定。

下面将分成4部分对每行代码具体做了哪些工作进行详细分析。

1.3 initAndRegister()

该方法的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final ChannelFuture initAndRegister() {
//结论:这里的channel为一个NioServerSocketChannel对象,具体分析见后面
final Channel channel = channelFactory().newChannel();//1
try {
init(channel);//2
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = group().register(channel);//3
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

通过函数名以及内部调用的函数可以猜测该函数干了两件事情:

1、初始化一个Channel,要想初始化,肯定要先得到一个Channel。

1
2
final Channel channel = channelFactory().newChannel();//1
init(channel);//2

2、将Channel进行注册。

1
ChannelFuture regFuture = group().register(channel);//3

下面我们将分析这几行代码内部干来些什么。

1.3.1 final Channel channel = channelFactory().newChannel();

在上一篇文章中分析中,我们知道b.channel(NioServerSocketChannel.class)的功能为:设置父类属性channelFactory 为: BootstrapChannelFactory类的对象。其中这里BootstrapChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class

因此,final Channel channel = channelFactory().newChannel();就是调用的BootstrapChannelFactory类中的newChannel()方法,该方法的具体内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;

BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}

看到这个类,我们可以得到的结论:final Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例。

1.3.2 NioServerSocketChannel构造器

下面将看下NioServerSocketChannel类的构造函数做了哪些工作。

NioServerSocketChannel类的继承体系结构如下:

img

其无参构造函数如下:

1
2
3
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

无参构造函数中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()

函数newSocket的功能为:利用SelectorProvider产生一个SocketChannelImpl对象。

1
2
3
4
5
6
7
8
9
10
11
12
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}

无参构造函数通过newSocket函数产生了一个SocketChannelImpl对象

然后调用了如下构造函数,我们继续看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//父类AbstractNioMessageChannel的构造函数
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
//父类 AbstractNioChannel的构造函数
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
try {
ch.configureBlocking(false);//设置当前的ServerSocketChannel为非阻塞的
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//父类AbstractChannel的构造函数
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}

new NioServerSocketChannel()产生一个实例对象时,调用上面这么多构造函数主要干了两件事情:

1、产生来一个SocketChannelImpl类的实例,设置到ch属性中,并设置为非阻塞的。

1
2
this.ch = ch;
ch.configureBlocking(false);

2、设置了config属性

1
config = new NioServerSocketChannelConfig(this, javaChannel().socket()

3、设置SelectionKey.OP_ACCEPT事件

1
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT

4、设置unsafe属性

1
2
3
4
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}

主要作用为:用来负责底层的connect、register、read和write等操作。

5、设置pipeline属性

1
pipeline = new DefaultChannelPipeline(this);

每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。

这些属性在后面都会用到,至于NioServerSocketChannel 对象中的unsafe、pipeline属性的具体实现后面进行分析。

结论:**final Channel channel = channelFactory().newChannel();**这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例,其中这个NioServerSocketChannel类对象有这样几个属性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline

1.4 init(channel)

init方法的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
void init(Channel channel) throws Exception {
//1、设置新接入channel的option
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);//NioServerSocketChannelConfig
}
//2、设置新接入channel的attr
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//3、设置handler到pipeline上
ChannelPipeline p = channel.pipeline();
if (handler() != null) {//这里的handler()返回的就是第二部分.handler(new SimpleServerHandler())所设置的SimpleServerHandler
p.addLast(handler());
}

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//p.addLast()向serverChannel的流水线处理器中加入了一个ServerBootstrapAcceptor,从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}

该函数的功能为:

1、设置channel的options

如果没有设置,则options为空,该属性在ServerBootstrap类中的定义如下

1
Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

options可能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);

if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == MAX_MESSAGES_PER_READ) {
setMaxMessagesPerRead((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
setAllocator((ByteBufAllocator) value);
} else if (option == RCVBUF_ALLOCATOR) {
setRecvByteBufAllocator((RecvByteBufAllocator) value);
} else if (option == AUTO_READ) {
setAutoRead((Boolean) value);
} else if (option == AUTO_CLOSE) {
setAutoClose((Boolean) value);
} else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
setWriteBufferHighWaterMark((Integer) value);
} else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
setWriteBufferLowWaterMark((Integer) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else {
return false;
}

return true;
}

2、设置channel的attrs

如果没有设置,则attrs为空,该属性在ServerBootstrap类中的定义如下

1
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();

3、设置handler到channel的pipeline上

其中,这里的handler为:在博文中分析的通过b.handler(new SimpleServerHandler())所设置的SimpleServerHandler对象

4、在pipeline上添加来一个ChannelInitializer对象,其中重写来initChannel方法。该方法通过p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor,
从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器

看到这里,我们发现其实init只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务.

1.5 group().register(channel)

回到 initAndRegister 方法中,继续看 config().group().register(channel) 这行代码,config 方法返回了 ServerBootstrapConfig,这个 ServerBootstrapConfig 调用了 group 方法,实际上就是 bossGroup。bossGroup 调用了 register 方法。

前面的分析我们知道group为:NioEvenLoopGroup,其继承MultithreadEventLoopGroup,该类中的register方法如下:

1
2
3
4
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);//调用了NioEvenLoop对象中的register方法,NioEventLoop extends SingleThreadEventLoop
}

next()方法的代码如下,其功能为选择下一个NioEventLoop对象。

1
2
3
4
@Override
public EventExecutor next() {
return chooser.next();//调用MultithreadEventExecutorGroup中的next方法
}

根据线程个数nThreads是否为2的幂次方来选择chooser,其中这两个chooser为: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser,这两个chooser功能都是一样,只是求余的方式不一样。

next()方法返回的是一个NioEvenLoop对象

1
2
3
4
5
6
7
8
9
10
11
12
13
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];//利用2的N次方法的特点,使用&求余更快。
}
}

private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}

结论:由于NioEventLoopGroup中维护着多个NioEventLoop,next方法回调用chooser策略找到下一个NioEventLoop,并执行该对象的register方法进行注册。

由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}

channel.unsafe().register(this, promise);
return promise;
}

在本博文第1部分的NioServerSocketChannel实例化中设置来unsafe属性,具体是调用如下的方法来设置的,因此这里的channel.unsafe()就是NioMessageUnsafe实例。

1
2
3
4
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}

channel.unsafe().register(this, promise)这行代码调用的是AbstractUnsafe类中的register方法,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//判断该channel是否已经被注册到EventLoop中
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

//1 将eventLoop设置在NioServerSocketChannel上
AbstractChannel.this.eventLoop = eventLoop;

//判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,则添加一个任务到该线程中
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() { //重点
@Override
public void run() {
register0(promise);//分析
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

上面的重点是register0(promise)方法。基本逻辑为:

1、通过调用eventLoop.inEventLoop()方法判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,说明该EventLoop在等待并没有执行权,则进行第二步。

AbstractEventExecutor.java

1
2
3
4
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}

SingleThreadEventExecutor.java

1
2
3
4
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}

2、既然该EventLoop中的线程此时没有执行权,但是我们可以提交一个任务到该线程中,等该EventLoop的线程有执行权的时候就自然而然的会执行此任务,而该任务负责调用register0方法,这样也就达到了调用register0方法的目的。

下面看register0这个方法,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
safeSetSuccess(promise);
//执行完,控制台输出:channelRegistered
pipeline.fireChannelRegistered();
if (isActive()) { //分析
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

在上面的代码中,是通过调用doRegister()方法完成NioServerSocketChannel的注册,该方法的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

protected SelectableChannel javaChannel() {
return ch;
}

在本博文的第1部分的NioServerSocketChannel的实例化分析中,我们知道这里的javaChannel()方法返回的ch为实例化NioServerSocketChannel时产生的一个SocketChannelImpl类的实例,并设置为非阻塞的,具体见本博文的第1部分。

selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel注册到Selector中。

回顾下,这里的eventLoop().selector是什么?答案是:KQueueSelectorImpl对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}

private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//...省略了一部分代码
return selector;
}

ServerSocketChannel注册完之后,接着执行pipeline.fireChannelRegistered方法。

1
2
3
4
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
return this;
}

我们看到**invokeChannelRegistered(****this.head)**传的参数是head,这个head我们再下一篇文章中讲,继续往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRegistered();
}
});
}

}

next.invokeChannelRegistered();

1
2
3
4
5
6
7
8
9
10
11
12
private void invokeChannelRegistered() {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRegistered(this);
} catch (Throwable var2) {
this.notifyHandlerException(var2);
}
} else {
this.fireChannelRegistered();
}

}

接着看看this.handler(),实际上就是head的handler()

1
2
3
public ChannelHandler handler() {
return this;
}

返回的是this,那接着看 head中的channelRegistered(this)

1
2
3
4
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}

继续看ctx.fireChannelRegistered();

1
2
3
4
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(this.findContextInbound());
return this;
}

我们看看this.findContextInbound()

1
2
3
4
5
6
7
8
9
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;

do {
ctx = ctx.next;
} while(!ctx.inbound);

return ctx;
}

我们看到 ctx = ctx.next; 实际上是从head开始找,找到第一个 inbound 的hander

1
2
3
4
5
6
7
8
9
10
11
12
13
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRegistered();
}
});
}

}

最后执行****next.invokeChannelRegistered();

pipeline中维护了handler链表,还记得之前.handler(new SimpleServerHandler())初始化的handler在本博文的第1.2部分的分析中介绍了此handler被添加到此pipeline中了,通过遍历链表,执行InBound类型handler的channelRegistered方法

因此执行到这里,我们的控制台就回输出:channelRegistered,这行信息。

到这里,我们就将doBind方法final ChannelFuture regFuture = initAndRegister();给分析完了,得到的结论如下:

1、通过反射产生了一个NioServerSocketChannle对象。

2、完成了初始化

3、将NioServerSocketChannel进行了注册。

接下来我们分析doBind方法的剩余部分代码主要做了什么,

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1
final Channel channel = regFuture.channel();//2
if (regFuture.cause() != null) {
return regFuture;
}

final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}

return promise;
}

1.6 doBind0(regFuture, channel, localAddress, promise);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

该函数主要是提交了一个Runnable任务到NioEventLoop线程中来进行处理。,这里先看一下NioEventLoop类的execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();//判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,如果不是,则先启动线程,然后添加任务到任务队列中去
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
//如果
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

当提交的任务被线程执行后,则会执行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)这行代码,这行代码完成的功能为:实现channel与端口的绑定。

具体如下:

1
2
3
4
5
6
AbstractChannel.java    

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

在该方法中直接调用了pipeline的bind方法,这里的pipeline时DefaultChannelPipeline的实例。

1
2
3
4
5
6
DefaultChannelPipeline.java 

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

在上面方法中直接调用了TailContext实例tail的bind方法,tail在下一篇博文中有详细的介绍。继续看tail实例的bind方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
AbstractChannelHandlerContext.java   

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//...省略有效性检查

final AbstractChannelHandlerContext next = findContextOutbound();//
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}

return promise;
}

此上面bind函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。

1
2
3
4
5
6
7
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}

在 DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾。 head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口和ChannelInboundHandler 接口, 并且它的 outbound 字段为 true.而tail 是 TailContext 的实例,它实现了ChannelInboundHandler 接口,并且其outbound 字段为 false,inbound 字段为true。 基于此在如上的bind函数中调用了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 对象其实就是 head.

继续看,在pipelie的双向链表中找到第一个outbound=true的AbstractChannelHandlerContext节点head后,然后调用此节点的invokeConnect方法,该方法的代码如下:

1
2
3
4
5
6
7
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

HeadContext类中的handler()方法代码如下:

1
2
3
4
@Override
public ChannelHandler handler() {
return this;
}

该方法返回的是其本身,这是因为HeadContext由于其继承AbstractChannelHandlerContext以及实现了ChannelHandler接口使其具有Context和Handler双重特性。

继续看,看HeadContext类中的bind方法,代码如下:

1
2
3
4
5
6
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

unsafe这个字段是在HeadContext构造函数中被初始化的,如下:

1
2
3
4
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
}

而此构造函数中的pipeline.channel().unsafe()这行代码返回的就是在本博文前面研究NioServerSocketChannel这个类的构造函数中所初始化的一个实例,如下:

1
unsafe = newUnsafe();//newUnsafe()方法返回的是NioMessageUnsafe对象。  

接下来看NioMessageUnsafe类中的bind方法(准确来说:该方法在AbstractUnsafe中),该类bind具体方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//...省略了部分代码
boolean wasActive = isActive();
try {
doBind(localAddress);//核心代码
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

上面的核心代码就是:doBind(localAddress);需要注意的是,此doBind方法是在NioServerSocketChannel类中的doBind方法,不是其他类中的。

NioServerSocketChannel类中的doBind方法代码如下:

1
2
3
4
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}

上面方法中javaChannel()方法返回的是NioServerSocketChannel实例初始化时所产生的Java NIO ServerSocketChannel实例(更具体点为ServerSocketChannelImple实例)。 等价于语句serverSocketChannel.socket().bind(localAddress)完成了指定端口的绑定,这样就开始监听此端口。绑定端口成功后,是这里调用了我们自定义handler的channelActive方法,在绑定之前,isActive()方法返回false,绑定之后返回true。

1
2
3
4
@Override
public boolean isActive() {
return javaChannel().socket().isBound();
}

这样,就进入了如下的if条件的代码块中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

private void invokeLater(Runnable task) {
try {
//省略了部分代码
eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
}

进而开始执行 pipeline.fireChannelActive();这行代码 ,这行代码的具体调用链如下所示:

DefaultChannelPipeline.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();

if (channel.config().isAutoRead()) {
channel.read();
}

return this;
}

@Override
public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this;
}

private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

2. 总结

最后,我们来做下总结,netty启动一个服务所经过的流程
1.设置启动类参数,最重要的就是设置channel
2.创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
3.init 初始化这个 NioServerSocketChannel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并触发addHandler事件

4.config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 根据group长度取模得到NioEventLoop ,将 NioServerSocketChannel 注册到 NioEventLoop 中的 selector 上,然后触发 channelRegistered事件

5.调用到jdk底层做端口绑定,并触发active事件


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.