1. Unsafe 顾名思义,unsafe是不安全的意思,就是告诉你不要在应用程序里面直接使用Unsafe以及他的衍生类对象。
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread
Unsafe 在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHandle(); SocketAddress localAddress(); SocketAddress remoteAddress(); void register(EventLoop eventLoop, ChannelPromise promise); void bind(SocketAddress localAddress, ChannelPromise promise); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); void disconnect(ChannelPromise promise); void close(ChannelPromise promise); void closeForcibly(); void beginRead(); void write(Object msg, ChannelPromise promise); void flush(); ChannelPromise voidPromise(); ChannelOutboundBuffer outboundBuffer(); }
1.1 Unsafe 继承结构
在 Unsafe
1 2 3 4 5 6 public interface NioUnsafe extends Unsafe { SelectableChannel ch(); void finishConnect(); void read(); void forceFlush(); }
1.2 Unsafe的分类 从以上继承结构来看,我们可以总结出两种类型的Unsafe分类,一个是与连接的字节数据读写相关的NioByteUnsafe
1 2 3 4 5 protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
最后一行已经与jdk底层以及netty中的ByteBuf相关,将jdk的 SelectableChannel
1 2 3 4 5 6 7 8 9 protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; }
1 2 3 4 5 @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
中的字节数据写到jdk的 SelectableChannel
2. pipeline 中的head NioEventLoop
1 2 3 4 5 6 7 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //新连接的已准备接入或者已存在的连接有数据可读 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); // 创建ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; do { // 分配一个ByteBuf byteBuf = allocHandle.allocate(allocator); // 将数据读取到分配的ByteBuf中去 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } // 触发事件,将会引发pipeline的读事件传播 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); pipeline.fireChannelReadComplete(); }
数据读完之后,调用 pipeline.fireChannelRead(byteBuf);
这里,我们的重点其实就是 pipeline.fireChannelRead(byteBuf);
1 2 3 4 5 6 7 8 final AbstractChannelHandlerContext head; //... head = new HeadContext(this); public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } }
从head节点继承的两个接口看,TA既是一个ChannelHandlerContext,同时又属于inBound和outBound Handler
3. pipeline 中inBound事件传播 我们接着上面的 AbstractChannelHandlerContext.invokeChannelRead(head, msg); 这个静态方法看,参数传入了 head,我们知道入站数据都是从 head 开始的,以保证后面所有的 handler 都由机会处理数据流。
1 2 3 4 5 6 7 8 9 10 11 12 13 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }
调用这个 Context (也就是 head) 的 invokeChannelRead 方法,并传入数据。我们再看看head中 invokeChannelRead 方法的实现,实际上是在headContext的父类AbstractChannelHandlerContext中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } public ChannelHandler handler() { return this; }
上面 handler()
就是``headContext中的handler,也就是headContext自身,也就是调用 head 的 channelRead 方法。那么这个方法是怎么实现的呢?
1 2 3 4 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
什么都没做,调用 Context 的 fire 系列方法,将请求转发给下一个节点。我们这里是 fireChannelRead 方法,注意,这里方法名字都挺像的。需要细心区分。下面我们看看 Context 的成员方法 fireChannelRead:
1 2 3 4 5 @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
这个是 head 的抽象父类 AbstractChannelHandlerContext 的实现,该方法再次调用了静态 fire 系列方法,但和上次不同的是,不再放入 head 参数了,而是使用 findContextInbound 方法的返回值。从这个方法的名字可以看出,是找到入站类型的 handler。我们看看方法实现:
1 2 3 4 5 6 7 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
该方法很简单,找到当前 Context 的 next 节点(inbound 类型的)并返回。这样就能将请求传递给后面的 inbound handler 了。我们来看看 invokeChannelRead(findContextInbound(), msg);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }
上面我们找到了next节点(inbound类型的),然后直接调用 next.invokeChannelRead(m);如果这个next是我们自定义的handler,此时我们自定义的handler的父类是AbstractChannelHandlerContext,则又回到了 AbstractChannelHandlerContext中实现的 invokeChannelRead,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } public ChannelHandler handler() { return this; }
此时的handler()就是我们自定义的handler了,然后调用我们自定义handler中的 channelRead (this , msg);
请求进来时,pipeline 会从 head 节点开始输送,通过配合 invoker 接口的 fire 系列方法,实现 Context 链在 pipeline 中的完美传递。最终到达我们自定义的 handler。
注意:此时如果我们想继续向后传递该怎么办呢?我们前面说过,可以调用 Context 的 fire 系列方法,就像 head 的 channelRead 方法一样,调用 fire 系列方法,直接向后传递就 ok 了。
4. pipeline 中的tail 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // This may not be a configuration error and so don't log anything. // The event may be superfluous for the current pipeline configuration. ReferenceCountUtil.release(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } }
1 2 3 4 5 6 7 8 9 protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
5. pipeline 中的outBound事件传播 上一节中,我们在阐述tail节点的功能时,忽略了其父类AbstractChannelHandlerContext
1 2 Channel channel = getChannel(userInfo); channel.writeAndFlush(pushInfo);
这段代码的含义就是根据用户信息拿到对应的Channel,然后给用户推送消息,跟进 channel.writeAndFlush
1 2 3 public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
1 2 3 public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
Channel 中大部分outBound事件都是从tail开始往外传播, writeAndFlush()
1 2 3 4 5 6 7 8 9 public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { write(msg, true, promise); return promise; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
1 2 3 4 5 6 7 private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
,然后调用next.invokeWriteAndFlush(m, promise)
1 2 3 4 5 6 7 8 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
1 2 3 4 5 6 7 private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
可以看到,数据开始出站,从后向前开始流动,和入站的方向是反的。那么最后会走到哪里呢,当然是走到 head 节点,因为 head 节点就是 outbound 类型的 handler。
1 2 3 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
调用了 底层的 unsafe 操作数据,这里,加深了我们对head节点的理解,即所有的数据写出都会经过head节点
当执行完这个 write 方法后,方法开始退栈。逐步退到 unsafe 的 read 方法,回到最初开始的地方,然后继续调用 pipeline.fireChannelReadComplete() 方法
6. 总结 总结一下一个请求在 pipeline 中的流转过程:
调用 pipeline 的 fire 系列方法,这些方法是接口 invoker 设计的,pipeline 实现了 invoker 的所有方法,inbound 事件从 head 开始流入,outbound 事件从 tail 开始流出。
pipeline 会将请求交给 Context,然后 Context 通过抽象父类 AbstractChannelHandlerContext 的 invoke 系列方法(静态和非静态的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各个 Context 的数据流转。
当入站过程中,调用 了出站的方法,那么请求就不会向后走了。后面的处理器将不会有任何作用。想继续相会传递就调用 Context 的 fire 系列方法,让 Netty 在内部帮你传递数据到下一个节点。如果你想在整个通道传递,就在 handler 中调用 channel 或者 pipeline 的对应方法,这两个方法会将数据从头到尾或者从尾到头的流转一遍。