tomcatweb 管理(Tomcat源码解析之Web请求与处理)
tomcatweb 管理
- 前言
- 一、EndPoint
- 二、ConnectionHandler
- 三、Coyote
- 四、容器责任链模式
先将当前请求封装成PollerEvent,new PollerEvent(socket, ka, OP_REGISTER);
(2)Poller线程会一直遍历可以处理的事件(netty的selestor),当找到需要处理的事件之后,调用processKey(sk, socketWrapper);
public void run() { if (interestOps == OP_REGISTER) { try { // 核心代码,终于找到了!!!!! // 当事件是注册的时候,将当前的NioSocketChannel注册到Poller的Selector上。 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. // SelectionKey被取消的时候需要将SelectionKey对应的EndPoint的Connection计数器,减一 socket.socketWrapper.getEndpoint().countDownConnection(); ((NioSocketWrapper) socket.socketWrapper).closed = true; } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) { } } } }
(4)Poller线程内会执行keyCount =;
// 得到selectedKeys的迭代器 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // 遍历所有的SelectionKey,并对其进行处理 while (iterator != null && iterator.hasNext()) { SelectionKey sk =; iterator.remove(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() // 如果有attachment,就处理 if (socketWrapper != null) { // 处理事件 processKey(sk, socketWrapper); } }
processKey在处理SelectionKey,如果当前Poller已经关闭,就取消key。SelectionKey对应的Channel如果发生读事件,就调用AbatractEndPoint.processSocket执行读操作processSocket(attachment, SocketEvent.OPEN_READ, true)
,如果SelectionKey对应的Channel发生写事件,就执行processSocket(attachment, SocketEvent.OPEN_WRITE, true)
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if (close) { // 如果Poller已经关闭了,就取消key cancelledKey(sk); } else if (sk.isValid() && attachment != null) { if (sk.isReadable() || sk.isWritable()) { if (attachment.getSendfileData() != null) { processSendfile(sk, attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write // 读优于写 // 如果SelectionKey对应的Channel已经准备好了读 // 就对NioSocketWrapper进行读操作 if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } // 如果SelectionKey对应的Channel已经准备好了写 // 就对NioSocketWrapper进行写操作 if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { // 如果已经关闭了,就取消key cancelledKey(sk); } } } }
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { // 得到socket的处理器 // Connector在构造函数里面已经指定了协议:org.apache.coyote.http11.Http11NioProtocol。 SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { // 如果没有,就创建一个Socket的处理器。创建的时候指定socketWrapper以及socket的事件。 sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } //socket的处理交给了线程池去处理。 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else {; }
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> { private final NioSelectorPool pool; private Poller poller = null; // 轮询的Poller private int interestOps = 0; private CountDownLatch readLatch = null; private CountDownLatch writeLatch = null; private volatile SendfileData sendfileData = null; private volatile long lastRead = System.currentTimeMillis(); private volatile long lastWrite = lastRead; private volatile boolean closed = false;
SocketProcessor的doRun方法,会根据SocketState进行处理,SocketState 为STOP、DISCONNECT或者ERROR的时候就进行关闭,SocketWrapperBase对应的selector事件,得到指定的Handler处理器进行处理。
@Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { int handshake = -1; try { if (key != null) { if (socket.isHandshakeComplete()) { // 是否已经握手成功,不需要TLS(加密)握手,就让处理器对socket和event的组合进行处理。 handshake = 0; } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // 不能够完成TLS握手,就把他认为是TLS握手失败。 handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. // 握手从/向socket读/写时,握手一旦完成状态应该为OPEN_WRITE, // 握手是在套接字打开时发生的,因此在完成后状态必须始终为OPEN_READ // 始终设置此选项是可以的,因为它仅在握手完成时使用。 event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake", x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (event == null) { // 调用处理器进行处理。 // NioEndPoint的默认Handler是Http11的 // 这里的Handler是AbstractProtocol.ConnectionHandler // 这个Handler的设置方法是: // 首先在Connector类的构造函数中,将默认的ProtocolHandler设置为org.apache.coyote.http11.Http11NioProtocol // AbstractHttp11Protocol的构造函数里面创建了Handler类ConnectionHandler state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } // 如果返回的状态是SocketState,那么就关掉连接 if (state == SocketState.CLOSED) { close(socket, key); } } else if (handshake == -1) { getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL); close(socket, key); } else if (handshake == SelectionKey.OP_READ) { // 如果是SelectionKey.OP_READ,也就是读事件的话,就将OP_READ时间设置到socketWrapper socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE) { // 如果是SelectionKey.OP_WRITE,也就是读事件的话,就将OP_WRITE事件设置到socketWrapper socketWrapper.registerWriteInterest(); }
上面是SocketProcessor的doRun方法,执行了getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
;process方法是首先在Map缓存中查找当前socket是否存在对应的processor,如果不存在,再去可循环的处理器栈中查找是否存在,如果不存在就创建相应的Processor,然后将新创建的Processor与Socket建立映射,存在connection的Map中。在任何一个阶段得到Processor对象之后,会执行processor的process方法state = processor.process(wrapper, status);
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> { private final AbstractProtocol<S> proto; private final RequestGroupInfo global = new RequestGroupInfo(); private final AtomicLong registerCount = new AtomicLong(0); // 终于找到了这个集合,给Socket和处理器建立连接 // 对每个有效链接都会缓存进这里,用于连接选择一个合适的Processor实现以进行请求处理。 private final Map<S, Processor> connections = new ConcurrentHashMap<>(); // 可循环的处理器栈 private final RecycledProcessors recycledProcessors = new RecycledProcessors(this); @Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // wrapper == null 表示Socket已经被关闭了,所以不需要做任何操作。 return SocketState.CLOSED; } // 得到wrapper内的Socket对象 S socket = wrapper.getSocket(); // 从Map缓冲区中得到socket对应的处理器。 Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } // Timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. // 超时是在专用线程上计算的,然后被调度。 // 因为调度过程中的延迟,可能不再需要超时。检查这里,避免不必要的处理。 if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() && !processor.isUpgrade() || processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; } // 如果Map缓存存在该socket相关的处理器 if (processor != null) { // Make sure an async timeout doesn't fire // 确保没有触发异步超时 getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. // SocketEvent事件是关闭,或者SocketEvent时间出错,此时不需要做任何操作。 // Endpoint需要一个CLOSED的信号,并且这里不再有与这个socket有关联了 return SocketState.CLOSED; } ContainerThreadMarker.set(); try { // Map缓存不存在该socket相关的处理器 if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); // OpenSSL typically returns null whereas JSSE typically // returns "" when no protocol is negotiated // OpenSSL通常返回null,而JSSE通常在没有协议协商时返回"" if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) { // 获取协商协议 UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { // 升级协议为空 processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter()); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor)); } } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "", negotiatedProtocol)); */ } } } // 经过上面的操作,processor还是null的话。 if (processor == null) { // 从recycledProcessors可循环processors中获取processor processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { // 创建处理器 processor = getProtocol().createProcessor(); register(processor); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor)); } } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); // 将socket和processor建立关联。 connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { // 调用processor的process方法。 state = processor.process(wrapper, status); // processor的process方法返回升级状态 if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler // 得到HTTP的升级句柄 UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input // 检索剩余输入 ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "", "h2c")); } // Exit loop and trigger appropriate clean-up state = SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } } } while (state == SocketState.UPGRADING);
(2)以Http11协议为例,执行的是Http11Processor,Http11Processor的祖父类AbstractProcessorLight实现了process方法,process调用了service模板方法,service模板方法是由Http11Processor进行实现的。service方法最重要的操作是执行getAdapter().service(request, response);
@Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { // 上面省略n行 // 调用Coyote的service方法 getAdapter().service(request, response); // 下面省略n行
@Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { // 上面省略n行 // 调用Coyote的service方法 getAdapter().service(request, response); // 下面省略n行
@Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { // 上面省略n行 // 调用Coyote的service方法 getAdapter().service(request, response); // 下面省略n行
@Override public final void invoke(Request request, Response response) throws IOException, ServletException { // 初始化我们需要的本地变量 boolean unavailable = false; Throwable throwable = null; // This should be a Request attribute... long t1 = System.currentTimeMillis(); // 原子类AtomicInteger,CAS操作,表示请求的数量。 requestCount.incrementAndGet(); StandardWrapper wrapper = (StandardWrapper) getContainer(); Servlet servlet = null; Context context = (Context) wrapper.getParent(); // 检查当前的Context应用是否已经被标注为不可以使用 if (!context.getState().isAvailable()) { // 如果当前应用不可以使用的话,就报503错误。 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, sm.getString("standardContext.isUnavailable")); unavailable = true; } // 检查Servelt是否被标记为不可使用 if (!unavailable && wrapper.isUnavailable()) { container.getLogger().info(sm.getString("standardWrapper.isUnavailable", wrapper.getName())); long available = wrapper.getAvailable(); if ((available > 0L) && (available < Long.MAX_VALUE)) { response.setDateHeader("Retry-After", available); response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, sm.getString("standardWrapper.isUnavailable", wrapper.getName())); } else if (available == Long.MAX_VALUE) { response.sendError(HttpServletResponse.SC_NOT_FOUND, sm.getString("standardWrapper.notFound", wrapper.getName())); } unavailable = true; } // Servelt是第一次调用的时候初始化 try { if (!unavailable) { // 如果此时Servelt还没有被初始化,就分配一个Servelt实例来处理request请求。 servlet = wrapper.allocate(); } /// 省略代码.......................................... // // 给该request创建Filter过滤链。Filter过滤链执行完之后,会执行Servelt ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet); // Call the filter chain for this request // NOTE: This also calls the servlet's service() method try { if ((servlet != null) && (filterChain != null)) { // Swallow output if needed if (context.getSwallowOutput()) { try { SystemLogHandler.startCapture(); if (request.isAsyncDispatching()) { request.getAsyncContextInternal().doInternalDispatch(); } else { // 调用过滤链 filterChain.doFilter(request.getRequest(), response.getResponse()); } /// 省略代码..........................................
- 如何让tomcat启动更快(快速解决Tomcat启动慢的问题,超简单)
- spring boot 如何启动tomcat(传统tomcat启动服务与springboot启动内置tomcat服务的区别推荐)
- idea里面怎么配置tomcat(intellij idea 使用Tomcat部署的项目位置在哪)
- tomcat运行警告(tomcat异常解决Invalid character found in the request target. The valid characters are defined in)
- idea的tomcat怎么运行项目(idea配置tomcat启动web项目的图文教程)
- apache服务部署tomcat(Apache与Tomcat服务器整合的基本配置方法及概要说明)
- idea如何运行tomcat项目(在IDEA 2020.3.1中部署Tomcat并且创建第一个web项目的过程详解)
- tomcat启动闪退拒绝访问(详解Tomcat双击startup.bat闪退的解决方法)
- tomcatcpu配置(Tomcat进程占用CPU过高的解决方法)
- 如何使用docker启动tomcat(Docker容器上用DockerFile部署多个tomcat服务的步骤)
- 如何设置tomcat的jvm(Tomcatc3p0配置jnid数据源2种实现方法解析)
- docker容器如何更改tomcat端口(Docker方式启动tomcat访问首页出现404错误)
- tomcat服务出现乱码(解决Tomcat10 Catalina log乱码问题)
- tomcat环境配置教程(Tomcat服务器的安装配置图文教程推荐)
- tomcat的部署和优化(看看Tomcat安装、配置、优化及负载均衡)
- apache tomcat安装教程(Apache及Tomcat搭建集群环境过程解析)
- 战 疫 时刻 致敬每一位石化大学的 守护者(战疫时刻)
- 老弄堂里的市井味道(老弄堂里的市井味道)
- 小米音乐可绑定QQ音乐, QQ音乐 真的会消失在小米的设备上吗(小米音乐可绑定QQ音乐)
- 小米Watch S1评测 或许能成为小米冲击高端可穿戴设备的里程碑(小米WatchS1评测或许能成为小米冲击高端可穿戴设备的里程碑)
- 手机QQ与小米路由器在一起 明天揭晓,敬请期待(手机QQ与小米路由器在一起)
- 小米音乐与 QQ 音乐合作,便捷迁移会员(小米音乐与QQ音乐合作)
- SqlServer2016模糊匹配的三种方式及效率问题简析(SqlServer2016模糊匹配的三种方式及效率问题简析)
- html5canvas动画(html5 canvas 实现光线沿不规则路径运动)
- html5的canvas代码(H5最强接口之canvas实现动态图形功能)
- pythonrequests框架实例(Python requests模块实例用法)
- 云主机和云虚拟主机有什么区别(什么是云主机?云主机是独立空间吗?)
- VS不生成.vhost.exe和.pdb文件
- dedecms缩略图插件(织梦DEDECMS有缩略图显示,没有显示随机指定图片的实现方法)
- vmwareworkstationlinux修改配置(VMware Workstation Pro 16搭建CentOS8虚拟机集群的图文教程)
- css实现很炫酷的效果(纯CSS实现酷炫的霓虹灯效果附demo)
- dedecms安全验证(dede验证码错误 dede验证码一直提示错误的解决方法)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9