<GlobalNamingResources> <Resource name="UserDatabase" auth="Container" type="org.apache.catalina.UserDatabase" description="User database that can be updated and saved" factory="org.apache.catalina.users.MemoryUserDatabaseFactory" pathname="conf/tomcat-users.xml" /> </GlobalNamingResources>
// Make sure parseBodyMethodsSet has a default // 2. 设置接受body的method列表,默认为POST if (null == parseBodyMethodsSet) { setParseBodyMethods(getParseBodyMethods()); }
if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr", getProtocolHandlerClassName())); } if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() && protocolHandler instanceof AbstractHttp11JsseProtocol) { AbstractHttp11JsseProtocol<?> jsseProtocolHandler = (AbstractHttp11JsseProtocol<?>) protocolHandler; if (jsseProtocolHandler.isSSLEnabled() && jsseProtocolHandler.getSslImplementationName() == null) { // OpenSSL is compatible with the JSSE configuration, so use it if APR is available jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName()); } }
// Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } setStopLatch(new CountDownLatch(pollerThreadCount));
public final void start() throws Exception { // 1. `bind()`已经在`init()`中分析过了 if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); }
@Override public void startInternal() throws Exception { if (!running) { running = true; paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool());
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0;
// Loop until we receive a shutdown command while (running) {
// Loop if endpoint is paused // 1. 运行过程中,如果`Endpoint`暂停了,则`Acceptor`进行自旋(间隔50毫秒) ` while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } // 2. 如果`Endpoint`终止运行了,则`Acceptor`也会终止 if (!running) { break; } state = AcceptorState.RUNNING;
try { //if we have reached max connections, wait // 3. 如果请求达到了最大连接数,则wait直到连接数降下来 countUpOrAwaitConnection();
SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket // 4. 接受下一次连接的socket socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0;
// Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 5. `setSocketOptions()`这儿是关键,会将socket以事件的方式传递给poller if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } }
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // 将channel注册到poller,注意关键的两个方法,`getPoller0()`和`Poller.register()` getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
/** * The socket poller. */ private Poller[] pollers = null; private AtomicInteger pollerRotater = new AtomicInteger(0); /** * Return an available poller in true round robin fashion. * * @return The next poller in sequence */ public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { //直接调用run方法 pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }
@Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false;
try { if (!close) { /执行PollerEvent的run方法 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events());
// 获取当前选择器中所有注册的“选择键(已就绪的监听事件)” Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. // 对已经准备好的key进行处理 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 真正处理key的地方 processKey(sk, attachment); } }//while
/** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { super(socketWrapper, event); }
try { if (key != null) { if (socket.isHandshakeComplete()) { // No TLS handshaking required. Let the handler // process this socket / event combination. handshake = 0; } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // Unable to complete the TLS handshake. Treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket // 将处理逻辑交给`Handler`处理,当event为null时,则表明是一个`OPEN_READ`事件 if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } else if (handshake == -1 ) { close(socket, key); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null; event = null; //return to cache if (running && !paused) { processorCache.push(this); } } } }
// Associate the processor with the connection connections.put(socket, processor);
SocketState state = SocketState.CLOSED; do { // 关键的代码,终于找到你了 state = processor.process(wrapper, status);
} while ( state == SocketState.UPGRADING); return state; } catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); }
// Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; }
public abstract class AbstractProcessorLight implements Processor {
@Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ // 调用`service()`方法 state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; }
if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); }
if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } }
if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);
try { // Parse and set Catalina and configuration specific // request parameters // 3. 解析请求,该方法会出现代理服务器、设置必要的header等操作 // 用来处理请求映射 (获取 host, context, wrapper, URI 后面的参数的解析, sessionId ) postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container // 4. 真正进入容器的地方,调用Engine容器下pipeline的阀门 connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } if (request.isAsync()) { async = true; ReadListener readListener = req.getReadListener(); if (readListener != null && request.isFinished()) { // Possible the all data may have been read during service() // method so this needs to be checked here ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); if (req.sendAllDataReadEvent()) { req.getReadListener().onAllDataRead(); } } finally { request.getContext().unbind(false, oldCL); } }
// If an async request was started, is not going to end once // this container thread finishes and an error occurred, trigger // the async error process if (!request.isAsyncCompleting() && throwable != null) { request.getAsyncContextInternal().setErrorState(throwable, true); } } else { //5. 通过request.finishRequest 与 response.finishResponse(刷OutputBuffer中的数据到浏览器) 来完成整个请求 request.finishRequest(); //将 org.apache.catalina.connector.Response对应的 OutputBuffer 中的数据 刷到 org.apache.coyote.Response 对应的 InternalOutputBuffer 中, 并且最终调用 socket对应的 outputStream 将数据刷出去( 这里会组装 Http Response 中的 header 与 body 里面的数据, 并且刷到远端 ) response.finishResponse(); }
if (request.isAsyncCompleting() && error.get()) { // Connection will be forcibly closed which will prevent // completion happening at the usual point. Need to trigger // call to onComplete() here. res.action(ActionCode.ASYNC_POST_PROCESS, null); async = false; }
// Access log if (!async && postParseSuccess) { // Log only if processing was invoked. // If postParseRequest() failed, it has already logged it. Context context = request.getContext(); // If the context is null, it is likely that the endpoint was // shutdown, this connection closed and the request recycled in // a different thread. That thread will have updated the access // log so it is OK not to update the access log here in that // case. if (context != null) { context.logAccess(request, response, System.currentTimeMillis() - req.getStartTime(), false); } }
if (undecodedURI.getType() == MessageBytes.T_BYTES) { // Copy the raw URI to the decodedURI decodedURI.duplicate(undecodedURI);
// Parse the path parameters. This will: // - strip out the path parameters // - convert the decodedURI to bytes parsePathParameters(req, request);
// URI decoding // %xx decoding of the URL try { req.getURLDecoder().convert(decodedURI, false); } catch (IOException ioe) { res.setStatus(400); res.setMessage("Invalid URI: " + ioe.getMessage()); connector.getService().getContainer().logAccess( request, response, 0, true); return false; } // Normalization if (!normalize(req.decodedURI())) { res.setStatus(400); res.setMessage("Invalid URI"); connector.getService().getContainer().logAccess( request, response, 0, true); return false; } // Character decoding convertURI(decodedURI, request); // Check that the URI is still normalized if (!checkNormalize(req.decodedURI())) { res.setStatus(400); res.setMessage("Invalid URI character encoding"); connector.getService().getContainer().logAccess( request, response, 0, true); return false; } } else { /* The URI is chars or String, and has been sent using an in-memory * protocol handler. The following assumptions are made: * - req.requestURI() has been set to the 'original' non-decoded, * non-normalized URI * - req.decodedURI() has been set to the decoded, normalized form * of req.requestURI() */ decodedURI.toChars(); // Remove all path parameters; any needed path parameter should be set // using the request object rather than passing it in the URL CharChunk uriCC = decodedURI.getCharChunk(); int semicolon = uriCC.indexOf(';'); if (semicolon > 0) { decodedURI.setChars (uriCC.getBuffer(), uriCC.getStart(), semicolon); } }
// Request mapping. MessageBytes serverName; if (connector.getUseIPVHosts()) { serverName = req.localName(); if (serverName.isNull()) { // well, they did ask for it res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null); } } else { serverName = req.serverName(); }
// Version for the second mapping loop and // Context that we expect to get for that version String version = null; Context versionContext = null; boolean mapRequired = true;
while (mapRequired) { // This will map the the latest version by default connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()); // 省略部分代码 } // 省略部分代码 }