提到Netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到NioEventLoopGroup
这个线程池,接下来进入正题。
1. 线程模型 首先来看一段Netty的使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.wrh.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class SimpleServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded"); } } }
下面将分析第一、二行代码,看下NioEventLoopGroup类的构造函数干了些什么。其余的部分将在其他博文中分析。
1 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
从代码中可以看到这里使用了两个线程池bossGroup
和workerGroup
,那么为什么需要定义两个线程池呢?这就要说到Netty的线程模型了。
Netty
的线程模型被称为Reactor
模型,具体如图所示,图上的mainReactor
指的就是bossGroup
,这个线程池处理客户端的连接请求,并将accept
的连接注册到subReactor
的其中一个线程上;图上的subReactor
当然指的就是workerGroup
,负责处理已建立的客户端通道上的数据读写;图上还有一块ThreadPool
是具体的处理业务逻辑的线程池,一般情况下可以复用subReactor
,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的ThreadPool
。
2. NioEventLoopGroup构造函数 NioEventLoopGroup
的构造函数的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, null); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); }
NioEventLoopGroup类中的构造函数最终都是调用的父类MultithreadEventLoopGroup
如下的构造函数:
1 2 3 protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }
从上面的构造函数可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()
来创建对象,即不指定线程个数,则netty给我们使用默认的线程个数,如果指定则用我们指定的线程个数。
默认线程个数相关的代码如下:
1 2 3 4 5 6 7 8 static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
而SystemPropertyUtil.getInt函数的功能为:得到系统属性中指定key(这里:key=”io.netty.eventLoopThreads”)所对应的value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu的核数的2倍。
结论:如果没有设置程序启动参数(或者说没有指定key=”io.netty.eventLoopThreads”的属性值),那么默认情况下线程的个数为cpu的核数乘以2。
继续看,由于MultithreadEventLoopGroup的构造函数是调用的是其父类MultithreadEventExecutorGroup的构造函数,因此,看下此类的构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; //根据线程个数是否为2的幂次方,采用不同策略初始化chooser if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } //产生nTreads个NioEventLoop对象保存在children数组中 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //如果newChild方法执行失败,则对前面执行new成功的几个NioEventLoop进行shutdown处理 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } }
该构造函数干了如下三件事:
1、产生了一个线程工场:threadFactory = newDefaultThreadFactory();
1 2 3 4 5 6 7 8 9 MultithreadEventExecutorGroup.java protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass());//getClass()为:NioEventLoopGroup.class } DefaultThreadFactory.java public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); }
2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser
1 2 3 private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }
3、 产生nTreads个NioEventLoop对象保存在children数组中 ,线程都是通过调用newChild方法来产生的。
1 2 3 4 5 @Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); }
这里传给NioEventLoop构造函数的参数为:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。
3. NioEventLoopGroup构造函数分析 既然上面提到来new一个NioEventLoop对象,下面我们就看下这个类以及其父类。
1 2 3 4 5 6 7 8 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); }
继续看父类 SingleThreadEventLoop的构造函数
1 2 3 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { super(parent, threadFactory, addTaskWakesUp); }
又是直接调用来父类SingleThreadEventExecutor的构造函数,继续看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.parent = parent; this.addTaskWakesUp = addTaskWakesUp;//false thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { //调用NioEventLoop类的run方法 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); taskQueue = newTaskQueue(); } protected Queue<Runnable> newTaskQueue() { return new LinkedBlockingQueue<Runnable>(); }
主要干如下两件事:
1、利用ThreadFactory创建来一个Thread,传入了一个Runnable对象,该Runnable重写的run代码比较长,不过重点仅仅是调用NioEventLoop类的run方法。
2、使用LinkedBlockingQueue类初始化taskQueue 。
其中,newThread方法的代码如下:
DefaultThreadFactory.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { //判断是否是守护线程,并进行设置 if (t.isDaemon()) { if (!daemon) { t.setDaemon(false); } } else { if (daemon) { t.setDaemon(true); } } //设置其优先级 if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(r, name); }
FastThreadLocalThread.java
1 2 3 public FastThreadLocalThread(Runnable target, String name) { super(target, name);// FastThreadLocalThread extends Thread }
到这里,可以看到底层还是借助于类似于Thread thread = new Thread(r)这种方式来创建线程。
关于NioEventLoop对象可以得到的点有,初始化了如下4个属性。
1、NioEventLoopGroup (在父类SingleThreadEventExecutor中)
2、selector
3、provider
4、thread (在父类SingleThreadEventExecutor中)
4. 总结 关于NioEventLoopGroup,总结如下
1、 如果不指定线程数,则线程数为:CPU的核数*2
2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser
3、产生nThreads个NioEventLoop对象保存在children数组中。
可以理解NioEventLoop就是一个线程,线程NioEventLoop中里面有如下几个属性:
1、NioEventLoopGroup (在父类SingleThreadEventExecutor中)
2、selector
3、provider
4、thread (在父类SingleThreadEventExecutor中)
更通俗点就是:NioEventLoopGroup就是一个线程池,NioEventLoop就是一个线程。NioEventLoopGroup线程池中有N个NioEventLoop线程。