private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //检查该SelectionKey是否有效,如果无效,则关闭channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }
try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // Connection already closed - no need to handle write. return; } } // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
@Override public void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; }
final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); }
ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { //1、分配缓存 byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();//可写的字节容量 //2、将socketChannel数据写入缓存 int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } //3、触发pipeline的ChannelRead事件来对byteBuf进行后续处理 pipeline.fireChannelRead(byteBuf); byteBuf = null;
// stop reading if (!config.isAutoRead()) { break; }
if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead);
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { if (minimum <= 0) { throw new IllegalArgumentException("minimum: " + minimum); } if (initial < minimum) { throw new IllegalArgumentException("initial: " + initial); } if (maximum < initial) { throw new IllegalArgumentException("maximum: " + maximum); }
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; }
int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { //这里的情况就是 a < size <= b 的情况 return mid + 1; } } }
2. byteBuf = allocHandle.allocate(allocator);
申请一块指定大小的内存
AdaptiveRecvByteBufAllocator#HandlerImpl
1 2 3 4
@Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(nextReceiveBufferSize); }
直接调用了ioBuffer方法,继续看
AbstractByteBufAllocator.java
1 2 3 4 5 6 7
@Override public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); }
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { return buf.writeBytes(in, length); }
AbstractByteBuf.java
1 2 3 4 5 6 7 8 9 10
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; }
public int read(ByteBuffer dst) throws IOException { ensureOpen(); if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { n = IOUtil.read(fd, dst, -1, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } }
// stop reading if (!config.isAutoRead()) { break; }
if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead);