本系列文章转自极客时间大佬李号双的专栏《深入拆解Tomcat & Jetty》

链接:https://time.geekbang.org/column/intro/100027701

仅供自己学习,联系我,侵删。

14 | NioEndpoint组件:Tomcat如何实现非阻塞I/O?

UNIX 系统下的 I/O 模型有 5 种:同步阻塞 I/O、同步非阻塞 I/O、I/O 多路复用、信号驱动 I/O 和异步 I/O。这些名词我们好像都似曾相识,但这些 I/O 通信模型有什么区别?同步和阻塞似乎是一回事,到底有什么不同?等一下,在这之前你是不是应该问自己一个终极问题:什么是 I/O?为什么需要这些 I/O 模型?

所谓的I/O 就是计算机内存与外部设备之间拷贝数据的过程。我们知道 CPU 访问内存的速度远远高于外部设备,因此 CPU 是先把外部设备的数据读到内存里,然后再进行处理。请考虑一下这个场景,当你的程序通过 CPU 向外部设备发出一个读指令时,数据从外部设备拷贝到内存往往需要一段时间,这个时候 CPU 没事干了,你的程序是主动把 CPU 让给别人?还是让 CPU 不停地查:数据到了吗,数据到了吗……

这就是 I/O 模型要解决的问题。今天我会先说说各种 I/O 模型的区别,然后重点分析 Tomcat 的 NioEndpoint 组件是如何实现非阻塞 I/O 模型的。

Java I/O 模型

对于一个网络 I/O 通信过程,比如网络数据读取,会涉及两个对象,一个是调用这个 I/O 操作的用户线程,另外一个就是操作系统内核。一个进程的地址空间分为用户空间和内核空间,用户线程不能直接访问内核空间。

当用户线程发起 I/O 操作后,网络数据读取操作会经历两个步骤:

  • 用户线程等待内核将数据从网卡拷贝到内核空间。
  • 内核将数据从内核空间拷贝到用户空间。

各种 I/O 模型的区别就是:它们实现这两个步骤的方式是不一样的。

同步阻塞 I/O:用户线程发起 read 调用后就阻塞了,让出 CPU。内核等待网卡数据到来,把数据从网卡拷贝到内核空间,接着把数据拷贝到用户空间,再把用户线程叫醒。

同步非阻塞 I/O:用户线程不断的发起 read 调用,数据没到内核空间时,每次都返回失败,直到数据到了内核空间,这一次 read 调用后,在等待数据从内核空间拷贝到用户空间这段时间里,线程还是阻塞的,等数据到了用户空间再把线程叫醒。

I/O 多路复用:用户线程的读取操作分成两步了,线程先发起 select 调用,目的是问内核数据准备好了吗?等内核把数据准备好了,用户线程再发起 read 调用。在等待数据从内核空间拷贝到用户空间这段时间里,线程还是阻塞的。那为什么叫 I/O 多路复用呢?因为一次 select 调用可以向内核查多个数据通道(Channel)的状态,所以叫多路复用。

异步 I/O:用户线程发起 read 调用的同时注册一个回调函数,read 立即返回,等内核将数据准备好后,再调用指定的回调函数完成处理。在这个过程中,用户线程一直没有阻塞。

NioEndpoint 组件

Tomcat 的 NioEndPoint 组件实现了 I/O 多路复用模型,接下来我会介绍 NioEndpoint 的实现原理,下一期我会介绍 Tomcat 如何实现异步 I/O 模型。

总体工作流程

我们知道,对于 Java 的多路复用器的使用,无非是两步:

\1. 创建一个 Seletor,在它身上注册各种感兴趣的事件,然后调用 select 方法,等待感兴趣的事情发生。

\2. 感兴趣的事情发生了,比如可以读了,这时便创建一个新的线程从 Channel 中读数据。

Tomcat 的 NioEndpoint 组件虽然实现比较复杂,但基本原理就是上面两步。我们先来看看它有哪些组件,它一共包含 LimitLatch、Acceptor、Poller、SocketProcessor 和 Executor 共 5 个组件,它们的工作过程如下图所示。

LimitLatch 是连接控制器,它负责控制最大连接数,NIO 模式下默认是 10000,达到这个阈值后,连接请求被拒绝。

Acceptor 跑在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。

Poller 的本质是一个 Selector,也跑在单独线程里。Poller 在内部维护一个 Channel 数组,它在一个死循环里不断检测 Channel 的数据就绪状态,一旦有 Channel 可读,就生成一个 SocketProcessor 任务对象扔给 Executor 去处理。

Executor 就是线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据。我们知道,Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。

接下来我详细介绍一下各组件的设计特点。

LimitLatch

LimitLatch 用来控制连接个数,当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减 1。请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。LimitLatch 的核心代码如下:

public class LimitLatch {
    private class Sync extends AbstractQueuedSynchronizer {
     
        @Override
        protected int tryAcquireShared() {
            long newCount = count.incrementAndGet();
            if (newCount > limit) {
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }
 
        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }
 
    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    
    // 线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
    public void countUpOrAwait() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }
 
    // 调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
    public long countDown() {
      sync.releaseShared(0);
      long result = getCount();
      return result;
   }
}

从上面的代码我们看到,LimitLatch 内步定义了内部类 Sync,而 Sync 扩展了 AQS,AQS 是 Java 并发包中的一个核心类,它在内部维护一个状态和一个线程队列,可以用来控制线程什么时候挂起,什么时候唤醒。我们可以扩展它来实现自己的同步器,实际上 Java 并发包里的锁和条件变量等等都是通过 AQS 来实现的,而这里的 LimitLatch 也不例外。

理解上面的代码时有两个要点:

  1. 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。那 AQS 怎么知道是阻塞还是不阻塞用户线程呢?其实这是由 AQS 的使用者来决定的,也就是内部类 Sync 来决定的,因为 Sync 类重写了 AQS 的tryAcquireShared() 方法。它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1。

  2. 如何用户线程被阻塞到了 AQS 的队列,那什么时候唤醒呢?同样是由 Sync 内部类决定,Sync 重写了 AQS 的releaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。

其实你会发现 AQS 就是一个骨架抽象类,它帮我们搭了个架子,用来控制线程的阻塞和唤醒。具体什么时候阻塞、什么时候唤醒由你来决定。我们还注意到,当前线程数被定义成原子变量 AtomicLong,而 limit 变量用 volatile 关键字来修饰,这些并发编程的实际运用。

Acceptor

Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。初始化过程如下:

serverSock = ServerSocketChannel.open();
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true);

从上面的初始化代码我们可以看到两个关键信息:

1.bind 方法的第二个参数表示操作系统的等待队列长度,我在上面提到,当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过 acceptCount 参数配置,默认是 100。

2.ServerSocketChannel 被设置成阻塞模式,也就是说它是以阻塞的方式接收连接的。

ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的生产者 - 消费者模式,Acceptor 与 Poller 线程之间通过 Queue 通信。

Poller

Poller 本质是一个 Selector,它内部维护一个 Queue,这个 Queue 定义如下:

private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 Synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。同时有多个 Poller 线程在运行,每个 Poller 线程都有自己的 Queue。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。

Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。

SocketProcessor

我们知道,Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,这里请你注意:

Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。

Executor

Executor 是 Tomcat 定制版的线程池,它负责创建真正干活的工作线程,干什么活呢?就是执行 SocketProcessor 的 run 方法,也就是解析请求并通过容器来处理请求,最终会调用到我们的 Servlet。后面我会用专门的篇幅介绍 Tomcat 怎么扩展和使用 Java 原生的线程池。

高并发思路

在弄清楚 NioEndpoint 的实现原理后,我们来考虑一个重要的问题,怎么把这个过程做到高并发呢?

高并发就是能快速地处理大量的请求,需要合理设计线程模型让 CPU 忙起来,尽量不要让线程阻塞,因为一阻塞,CPU 就闲下来了。另外就是有多少任务,就用相应规模的线程数去处理。我们注意到 NioEndpoint 要完成三件事情:接收连接、检测 I/O 事件以及处理请求,那么最核心的就是把这三件事情分开,用不同规模的线程去处理,比如用专门的线程组去跑 Acceptor,并且 Acceptor 的个数可以配置;用专门的线程组去跑 Poller,Poller 的个数也可以配置;最后具体任务的执行也由专门的线程池来处理,也可以配置线程池的大小。

本期精华

I/O 模型是为了解决内存和外部设备速度差异的问题。我们平时说的阻塞或非阻塞是指应用程序在发起 I/O 操作时,是立即返回还是等待。而同步和异步,是指应用程序在与内核通信时,数据从内核空间到应用空间的拷贝,是由内核主动发起还是由应用程序来触发。

在 Tomcat 中,EndPoint 组件的主要工作就是处理 I/O,而 NioEndpoint 利用 Java NIO API 实现了多路复用 I/O 模型。其中关键的一点是,读写数据的线程自己不会阻塞在 I/O 等待上,而是把这个工作交给 Selector。同时 Tomcat 在这个过程中运用到了很多 Java 并发编程技术,比如 AQS、原子类、并发容器,线程池等,都值得我们去细细品味。

15 | Nio2Endpoint组件:Tomcat如何实现异步I/O?

我在专栏上一期里提到了 5 种 I/O 模型,相应的,Java 提供了 BIO、NIO 和 NIO.2 这些 API 来实现这些 I/O 模型。BIO 是我们最熟悉的同步阻塞,NIO 是同步非阻塞,那 NIO.2 又是什么呢?NIO 已经足够好了,为什么还要 NIO.2 呢?

NIO 和 NIO.2 最大的区别是,一个是同步一个是异步。我在上期提到过,异步最大的特点是,应用程序不需要自己去触发数据从内核空间到用户空间的拷贝

为什么是应用程序去“触发”数据的拷贝,而不是直接从内核拷贝数据呢?这是因为应用程序是不能访问内核空间的,因此数据拷贝肯定是由内核来做,关键是谁来触发这个动作。

是内核主动将数据拷贝到用户空间并通知应用程序。还是等待应用程序通过 Selector 来查询,当数据就绪后,应用程序再发起一个 read 调用,这时内核再把数据从内核空间拷贝到用户空间。

需要注意的是,数据从内核空间拷贝到用户空间这段时间,应用程序还是阻塞的。所以你会看到异步的效率是高于同步的,因为异步模式下应用程序始终不会被阻塞。下面我以网络数据读取为例,来说明异步模式的工作过程。

首先,应用程序在调用 read API 的同时告诉内核两件事情:数据准备好了以后拷贝到哪个 Buffer,以及调用哪个回调函数去处理这些数据。

之后,内核接到这个 read 指令后,等待网卡数据到达,数据到了后,产生硬件中断,内核在中断程序里把数据从网卡拷贝到内核空间,接着做 TCP/IP 协议层面的数据解包和重组,再把数据拷贝到应用程序指定的 Buffer,最后调用应用程序指定的回调函数。

你可能通过下面这张图来回顾一下同步与异步的区别:

我们可以看到在异步模式下,应用程序当了“甩手掌柜”,内核则忙前忙后,但最大限度提高了 I/O 通信的效率。Windows 的 IOCP 和 Linux 内核 2.6 的 AIO 都提供了异步 I/O 的支持,Java 的 NIO.2 API 就是对操作系统异步 I/O API 的封装。

Java NIO.2 回顾

今天我们会重点关注 Tomcat 是如何实现异步 I/O 模型的,但在这之前,我们先来简单回顾下如何用 Java 的 NIO.2 API 来编写一个服务端程序。

public class Nio2Server {
 
   void listen(){
      //1. 创建一个线程池
      ExecutorService es = Executors.newCachedThreadPool();
 
      //2. 创建异步通道群组
      AsynchronousChannelGroup tg = AsynchronousChannelGroup.withCachedThreadPool(es, 1);
      
      //3. 创建服务端异步通道
      AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(tg);
 
      //4. 绑定监听端口
      assc.bind(new InetSocketAddress(8080));
 
      //5. 监听连接,传入回调类处理连接请求
      assc.accept(this, new AcceptHandler()); 
   }
}

上面的代码主要做了 5 件事情:

  1. 创建一个线程池,这个线程池用来执行来自内核的回调请求。
  2. 创建一个 AsynchronousChannelGroup,并绑定一个线程池。
  3. 创建 AsynchronousServerSocketChannel,并绑定到 AsynchronousChannelGroup。
  4. 绑定一个监听端口。
  5. 调用 accept 方法开始监听连接请求,同时传入一个回调类去处理连接请求。请你注意,accept 方法的第一个参数是 this 对象,就是 Nio2Server 对象本身,我在下文还会讲为什么要传入这个参数。

你可能会问,为什么需要创建一个线程池呢?其实在异步 I/O 模型里,应用程序不知道数据在什么时候到达,因此向内核注册回调函数,当数据到达时,内核就会调用这个回调函数。同时为了提高处理速度,会提供一个线程池给内核使用,这样不会耽误内核线程的工作,内核只需要把工作交给线程池就立即返回了。

我们再来看看处理连接的回调类 AcceptHandler 是什么样的。

//AcceptHandler 类实现了 CompletionHandler 接口的 completed 方法。它还有两个模板参数,第一个是异步通道,第二个就是 Nio2Server 本身
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Nio2Server> {
 
   // 具体处理连接请求的就是 completed 方法,它有两个参数:第一个是异步通道,第二个就是上面传入的 NioServer 对象
   @Override
   public void completed(AsynchronousSocketChannel asc, Nio2Server attachment) {      
      // 调用 accept 方法继续接收其他客户端的请求
      attachment.assc.accept(attachment, this);
      
      //1. 先分配好 Buffer,告诉内核,数据拷贝到哪里去
      ByteBuffer buf = ByteBuffer.allocate(1024);
      
      //2. 调用 read 函数读取数据,除了把 buf 作为参数传入,还传入读回调类
      channel.read(buf, buf, new ReadHandler(asc)); 
 
}

我们看到它实现了 CompletionHandler 接口,下面我们先来看看 CompletionHandler 接口的定义。

public interface CompletionHandler<V,A> {
 
    void completed(V result, A attachment);
 
    void failed(Throwable exc, A attachment);
}

CompletionHandler 接口有两个模板参数 V 和 A,分别表示 I/O 调用的返回值和附件类。比如 accept 的返回值就是 AsynchronousSocketChannel,而附件类由用户自己决定,在 accept 的调用中,我们传入了一个 Nio2Server。因此 AcceptHandler 带有了两个模板参数:AsynchronousSocketChannel 和 Nio2Server。

CompletionHandler 有两个方法:completed 和 failed,分别在 I/O 操作成功和失败时调用。completed 方法有两个参数,其实就是前面说的两个模板参数。也就是说,Java 的 NIO.2 在调用回调方法时,会把返回值和附件类当作参数传给 NIO.2 的使用者。

下面我们再来看看处理读的回调类 ReadHandler 长什么样子。

public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {   
    // 读取到消息后的处理  
    @Override  
    public void completed(Integer result, ByteBuffer attachment) {  
        //attachment 就是数据,调用 flip 操作,其实就是把读的位置移动最前面
        attachment.flip();  
        // 读取数据
        ... 
    }  
 
    void failed(Throwable exc, A attachment){
        ...
    }
}

read 调用的返回值是一个整型数,所以我们回调方法里的第一个参数就是一个整型,表示有多少数据被读取到了 Buffer 中。第二个参数是一个 ByteBuffer,这是因为我们在调用 read 方法时,把用来存放数据的 ByteBuffer 当作附件类传进去了,所以在回调方法里,有 ByteBuffer 类型的参数,我们直接从这个 ByteBuffer 里获取数据。

Nio2Endpoint

掌握了 Java NIO.2 API 的使用以及服务端程序的工作原理之后,再来理解 Tomcat 的异步 I/O 实现就不难了。我们先通过一张图来看看 Nio2Endpoint 有哪些组件。

从图上看,总体工作流程跟 NioEndpoint 是相似的。

LimitLatch 是连接控制器,它负责控制最大连接数。

Nio2Acceptor 扩展了 Acceptor,用异步 I/O 的方式来接收连接,跑在一个单独的线程里,也是一个线程组。Nio2Acceptor 接收新的连接后,得到一个 AsynchronousSocketChannel,Nio2Acceptor 把 AsynchronousSocketChannel 封装成一个 Nio2SocketWrapper,并创建一个 SocketProcessor 任务类交给线程池处理,并且 SocketProcessor 持有 Nio2SocketWrapper 对象。

Executor 在执行 SocketProcessor 时,SocketProcessor 的 run 方法会调用 Http11Processor 来处理请求,Http11Processor 会通过 Nio2SocketWrapper 读取和解析请求数据,请求经过容器处理后,再把响应通过 Nio2SocketWrapper 写出。

需要你注意 Nio2Endpoint 跟 NioEndpoint 的一个明显不同点是,Nio2Endpoint 中没有 Poller 组件,也就是没有 Selector。这是为什么呢?因为在异步 I/O 模式下,Selector 的工作交给内核来做了。

接下来我详细介绍一下 Nio2Endpoint 各组件的设计。

Nio2Acceptor

和 NioEndpint 一样,Nio2Endpoint 的基本思路是用 LimitLatch 组件来控制连接数,但是 Nio2Acceptor 的监听连接的过程不是在一个死循环里不断的调 accept 方法,而是通过回调函数来完成的。我们来看看它的连接监听方法:

serverSock.accept(null, this);

其实就是调用了 accept 方法,注意它的第二个参数是 this,表明 Nio2Acceptor 自己就是处理连接的回调类,因此 Nio2Acceptor 实现了 CompletionHandler 接口。那么它是如何实现 CompletionHandler 接口的呢?

protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
    implements CompletionHandler<AsynchronousSocketChannel, Void> {
    
@Override
public void completed(AsynchronousSocketChannel socket,
        Void attachment) {
        
    if (isRunning() && !isPaused()) {
        if (getMaxConnections() == -1) {
            // 如果没有连接限制,继续接收新的连接
            serverSock.accept(null, this);
        } else {
            // 如果有连接限制,就在线程池里跑 Run 方法,Run 方法会检查连接数
            getExecutor().execute(this);
        }
        // 处理请求
        if (!setSocketOptions(socket)) {
            closeSocket(socket);
        }
    } 
}

可以看到 CompletionHandler 的两个模板参数分别是 AsynchronousServerSocketChannel 和 Void,我在前面说过第一个参数就是 accept 方法的返回值,第二个参数是附件类,由用户自己决定,这里为 Void。completed 方法的处理逻辑比较简单:

  • 如果没有连接限制,继续在本线程中调用 accept 方法接收新的连接。
  • 如果有连接限制,就在线程池里跑 run 方法去接收新的连接。那为什么要跑 run 方法呢,因为在 run 方法里会检查连接数,当连接达到最大数时,线程可能会被 LimitLatch 阻塞。为什么要放在线程池里跑呢?这是因为如果放在当前线程里执行,completed 方法可能被阻塞,会导致这个回调方法一直不返回。

接着 completed 方法会调用 setSocketOptions 方法,在这个方法里,会创建 Nio2SocketWrapper 和 SocketProcessor,并交给线程池处理。

Nio2SocketWrapper

Nio2SocketWrapper 的主要作用是封装 Channel,并提供接口给 Http11Processor 读写数据。讲到这里你是不是有个疑问:Http11Processor 是不能阻塞等待数据的,按照异步 I/O 的套路,Http11Processor 在调用 Nio2SocketWrapper 的 read 方法时需要注册回调类,read 调用会立即返回,问题是立即返回后 Http11Processor 还没有读到数据, 怎么办呢?这个请求的处理不就失败了吗?

为了解决这个问题,Http11Processor 是通过 2 次 read 调用来完成数据读取操作的。

  • 第一次 read 调用:连接刚刚建立好后,Acceptor 创建 SocketProcessor 任务类交给线程池去处理,Http11Processor 在处理请求的过程中,会调用 Nio2SocketWrapper 的 read 方法发出第一次读请求,同时注册了回调类 readCompletionHandler,因为数据没读到,Http11Processor 把当前的 Nio2SocketWrapper 标记为数据不完整。接着 SocketProcessor 线程被回收,Http11Processor 并没有阻塞等待数据。这里请注意,Http11Processor 维护了一个 Nio2SocketWrapper 列表,也就是维护了连接的状态。
  • 第二次 read 调用:当数据到达后,内核已经把数据拷贝到 Http11Processor 指定的 Buffer 里,同时回调类 readCompletionHandler 被调用,在这个回调处理方法里会重新创建一个新的 SocketProcessor 任务来继续处理这个连接,而这个新的 SocketProcessor 任务类持有原来那个 Nio2SocketWrapper,这一次 Http11Processor 可以通过 Nio2SocketWrapper 读取数据了,因为数据已经到了应用层的 Buffer。

这个回调类 readCompletionHandler 的源码如下,最关键的一点是,Nio2SocketWrapper 是作为附件类来传递的,这样在回调函数里能拿到所有的上下文。

this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
    public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
        ...
        // 通过附件类 SocketWrapper 拿到所有的上下文
        Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
    }
 
    public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
        ...
    }
}

本期精华

在异步 I/O 模型里,内核做了很多事情,它把数据准备好,并拷贝到用户空间,再通知应用程序去处理,也就是调用应用程序注册的回调函数。Java 在操作系统 异步 IO API 的基础上进行了封装,提供了 Java NIO.2 API,而 Tomcat 的异步 I/O 模型就是基于 Java NIO.2 实现的。

由于 NIO 和 NIO.2 的 API 接口和使用方法完全不同,可以想象一个系统中如果已经支持同步 I/O,要再支持异步 I/O,改动是比较大的,很有可能不得不重新设计组件之间的接口。但是 Tomcat 通过充分的抽象,比如 SocketWrapper 对 Channel 的封装,再加上 Http11Processor 的两次 read 调用,巧妙地解决了这个问题,使得协议处理器 Http11Processor 和 I/O 通信处理器 Endpoint 之间的接口保持不变。

16 | AprEndpoint组件:Tomcat APR提高I/O性能的秘密

我们在使用 Tomcat 时,会在启动日志里看到这样的提示信息:

The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: ***

这句话的意思就是推荐你去安装 APR 库,可以提高系统性能。那什么是 APR 呢?

APR(Apache Portable Runtime Libraries)是 Apache 可移植运行时库,它是用 C 语言实现的,其目的是向上层应用程序提供一个跨平台的操作系统接口库。Tomcat 可以用它来处理包括文件和网络 I/O,从而提升性能。我在专栏前面提到过,Tomcat 支持的连接器有 NIO、NIO.2 和 APR。跟 NioEndpoint 一样,AprEndpoint 也实现了非阻塞 I/O,它们的区别是:NioEndpoint 通过调用 Java 的 NIO API 来实现非阻塞 I/O,而 AprEndpoint 是通过 JNI 调用 APR 本地库而实现非阻塞 I/O 的。

那同样是非阻塞 I/O,为什么 Tomcat 会提示使用 APR 本地库的性能会更好呢?这是因为在某些场景下,比如需要频繁与操作系统进行交互,Socket 网络通信就是这样一个场景,特别是如果你的 Web 应用使用了 TLS 来加密传输,我们知道 TLS 协议在握手过程中有多次网络交互,在这种情况下 Java 跟 C 语言程序相比还是有一定的差距,而这正是 APR 的强项。

Tomcat 本身是 Java 编写的,为了调用 C 语言编写的 APR,需要通过 JNI 方式来调用。JNI(Java Native Interface) 是 JDK 提供的一个编程接口,它允许 Java 程序调用其他语言编写的程序或者代码库,其实 JDK 本身的实现也大量用到 JNI 技术来调用本地 C 程序库。

在今天这一期文章,首先我会讲 AprEndpoint 组件的工作过程,接着我会在原理的基础上分析 APR 提升性能的一些秘密。在今天的学习过程中会涉及到一些操作系统的底层原理,毫无疑问掌握这些底层知识对于提高你的内功非常有帮助。

AprEndpoint 工作过程

下面我还是通过一张图来帮你理解 AprEndpoint 的工作过程。

你会发现它跟 NioEndpoint 的图很像,从左到右有 LimitLatch、Acceptor、Poller、SocketProcessor 和 Http11Processor,只是 Acceptor 和 Poller 的实现和 NioEndpoint 不同。接下来我分别来讲讲这两个组件。

Acceptor

Accpetor 的功能就是监听连接,接收并建立连接。它的本质就是调用了四个操作系统 API:socket、bind、listen 和 accept。那 Java 语言如何直接调用 C 语言 API 呢?答案就是通过 JNI。具体来说就是两步:先封装一个 Java 类,在里面定义一堆用native 关键字修饰的方法,像下面这样。

public class Socket {
  ...
  // 用 native 修饰这个方法,表明这个函数是 C 语言实现
  public static native long create(int family, int type,
                                 int protocol, long cont)
                                 
  public static native int bind(long sock, long sa);
  
  public static native int listen(long sock, int backlog);
  
  public static native long accept(long sock)
}

接着用 C 代码实现这些方法,比如 bind 函数就是这样实现的:

// 注意函数的名字要符合 JNI 规范的要求
JNIEXPORT jint JNICALL 
Java_org_apache_tomcat_jni_Socket_bind(JNIEnv *e, jlong sock,jlong sa)
	{
	    jint rv = APR_SUCCESS;
	    tcn_socket_t *s = (tcn_socket_t *)sock;
	    apr_sockaddr_t *a = (apr_sockaddr_t *) sa;
	
        // 调用 APR 库自己实现的 bind 函数
	    rv = (jint)apr_socket_bind(s->sock, a);
	    return rv;
	}

专栏里我就不展开 JNI 的细节了,你可以扩展阅读获得更多信息和例子。我们要注意的是函数名字要符合 JNI 的规范,以及 Java 和 C 语言如何互相传递参数,比如在 C 语言有指针,Java 没有指针的概念,所以在 Java 中用 long 类型来表示指针。AprEndpoint 的 Acceptor 组件就是调用了 APR 实现的四个 API。

Poller

Acceptor 接收到一个新的 Socket 连接后,按照 NioEndpoint 的实现,它会把这个 Socket 交给 Poller 去查询 I/O 事件。AprEndpoint 也是这样做的,不过 AprEndpoint 的 Poller 并不是调用 Java NIO 里的 Selector 来查询 Socket 的状态,而是通过 JNI 调用 APR 中的 poll 方法,而 APR 又是调用了操作系统的 epoll API 来实现的。

这里有个特别的地方是在 AprEndpoint 中,我们可以配置一个叫deferAccept的参数,它对应的是 TCP 协议中的TCP_DEFER_ACCEPT,设置这个参数后,当 TCP 客户端有新的连接请求到达时,TCP 服务端先不建立连接,而是再等等,直到客户端有请求数据发过来时再建立连接。这样的好处是服务端不需要用 Selector 去反复查询请求数据是否就绪。

这是一种 TCP 协议层的优化,不是每个操作系统内核都支持,因为 Java 作为一种跨平台语言,需要屏蔽各种操作系统的差异,因此并没有把这个参数提供给用户;但是对于 APR 来说,它的目的就是尽可能提升性能,因此它向用户暴露了这个参数。

APR 提升性能的秘密

APR 连接器之所以能提高 Tomcat 的性能,除了 APR 本身是 C 程序库之外,还有哪些提速的秘密呢?

JVM 堆 VS 本地内存

我们知道 Java 的类实例一般在 JVM 堆上分配,而 Java 是通过 JNI 调用 C 代码来实现 Socket 通信的,那么 C 代码在运行过程中需要的内存又是从哪里分配的呢?C 代码能否直接操作 Java 堆?

为了回答这些问题,我先来说说 JVM 和用户进程的关系。如果你想运行一个 Java 类文件,可以用下面的 Java 命令来执行。

java my.class

这个命令行中的java其实是一个可执行程序,这个程序会创建 JVM 来加载和运行你的 Java 类。操作系统会创建一个进程来执行这个java可执行程序,而每个进程都有自己的虚拟地址空间,JVM 用到的内存(包括堆、栈和方法区)就是从进程的虚拟地址空间上分配的。请你注意的是,JVM 内存只是进程空间的一部分,除此之外进程空间内还有代码段、数据段、内存映射区、内核空间等。从 JVM 的角度看,JVM 内存之外的部分叫作本地内存,C 程序代码在运行过程中用到的内存就是本地内存中分配的。下面我们通过一张图来理解一下。

Tomcat 的 Endpoint 组件在接收网络数据时需要预先分配好一块 Buffer,所谓的 Buffer 就是字节数组byte[],Java 通过 JNI 调用把这块 Buffer 的地址传给 C 代码,C 代码通过操作系统 API 读取 Socket 并把数据填充到这块 Buffer。Java NIO API 提供了两种 Buffer 来接收数据:HeapByteBuffer 和 DirectByteBuffer,下面的代码演示了如何创建两种 Buffer。

// 分配 HeapByteBuffer
ByteBuffer buf = ByteBuffer.allocate(1024);
 
// 分配 DirectByteBuffer
ByteBuffer buf = ByteBuffer.allocateDirect(1024);

创建好 Buffer 后直接传给 Channel 的 read 或者 write 函数,最终这块 Buffer 会通过 JNI 调用传递给 C 程序。

// 将 buf 作为 read 函数的参数
int bytesRead = socketChannel.read(buf);

那 HeapByteBuffer 和 DirectByteBuffer 有什么区别呢?HeapByteBuffer 对象本身在 JVM 堆上分配,并且它持有的字节数组byte[]也是在 JVM 堆上分配。但是如果用HeapByteBuffer来接收网络数据,需要把数据从内核先拷贝到一个临时的本地内存,再从临时本地内存拷贝到 JVM 堆,而不是直接从内核拷贝到 JVM 堆上。这是为什么呢?这是因为数据从内核拷贝到 JVM 堆的过程中,JVM 可能会发生 GC,GC 过程中对象可能会被移动,也就是说 JVM 堆上的字节数组可能会被移动,这样的话 Buffer 地址就失效了。如果这中间经过本地内存中转,从本地内存到 JVM 堆的拷贝过程中 JVM 可以保证不做 GC。

如果使用 HeapByteBuffer,你会发现 JVM 堆和内核之间多了一层中转,而 DirectByteBuffer 用来解决这个问题,DirectByteBuffer 对象本身在 JVM 堆上,但是它持有的字节数组不是从 JVM 堆上分配的,而是从本地内存分配的。DirectByteBuffer 对象中有个 long 类型字段 address,记录着本地内存的地址,这样在接收数据的时候,直接把这个本地内存地址传递给 C 程序,C 程序会将网络数据从内核拷贝到这个本地内存,JVM 可以直接读取这个本地内存,这种方式比 HeapByteBuffer 少了一次拷贝,因此一般来说它的速度会比 HeapByteBuffer 快好几倍。你可以通过上面的图加深理解。

Tomcat 中的 AprEndpoint 就是通过 DirectByteBuffer 来接收数据的,而 NioEndpoint 和 Nio2Endpoint 是通过 HeapByteBuffer 来接收数据的。你可能会问,NioEndpoint 和 Nio2Endpoint 为什么不用 DirectByteBuffer 呢?这是因为本地内存不好管理,发生内存泄漏难以定位,从稳定性考虑,NioEndpoint 和 Nio2Endpoint 没有去冒这个险。

sendfile

我们再来考虑另一个网络通信的场景,也就是静态文件的处理。浏览器通过 Tomcat 来获取一个 HTML 文件,而 Tomcat 的处理逻辑无非是两步:

  1. 从磁盘读取 HTML 到内存。
  2. 将这段内存的内容通过 Socket 发送出去。

但是在传统方式下,有很多次的内存拷贝:

  • 读取文件时,首先是内核把文件内容读取到内核缓冲区。
  • 如果使用 HeapByteBuffer,文件数据从内核到 JVM 堆内存需要经过本地内存中转。
  • 同样在将文件内容推入网络时,从 JVM 堆到内核缓冲区需要经过本地内存中转。
  • 最后还需要把文件从内核缓冲区拷贝到网卡缓冲区。

从下面的图你会发现这个过程有 6 次内存拷贝,并且 read 和 write 等系统调用将导致进程从用户态到内核态的切换,会耗费大量的 CPU 和内存资源。

而 Tomcat 的 AprEndpoint 通过操作系统层面的 sendfile 特性解决了这个问题,sendfile 系统调用方式非常简洁。

sendfile(socket, file, len);

它带有两个关键参数:Socket 和文件句柄。将文件从磁盘写入 Socket 的过程只有两步:

第一步:将文件内容读取到内核缓冲区。

第二步:数据并没有从内核缓冲区复制到 Socket 关联的缓冲区,只有记录数据位置和长度的描述符被添加到 Socket 缓冲区中;接着把数据直接从内核缓冲区传递给网卡。这个过程你可以看下面的图。

本期精华

对于一些需要频繁与操作系统进行交互的场景,比如网络通信,Java 的效率没有 C 语言高,特别是 TLS 协议握手过程中需要多次网络交互,这种情况下使用 APR 本地库能够显著提升性能。

除此之外,APR 提升性能的秘密还有:通过 DirectByteBuffer 避免了 JVM 堆与本地内存之间的内存拷贝;通过 sendfile 特性避免了内核与应用之间的内存拷贝以及用户态和内核态的切换。其实很多高性能网络通信组件,比如 Netty,都是通过 DirectByteBuffer 来收发网络数据的。由于本地内存难于管理,Netty 采用了本地内存池技术,感兴趣的同学可以深入了解一下。

17 | Executor组件:Tomcat如何扩展Java线程池?

在开发中我们经常会碰到“池”的概念,比如数据库连接池、内存池、线程池、常量池等。为什么需要“池”呢?程序运行的本质,就是通过使用系统资源(CPU、内存、网络、磁盘等)来完成信息的处理,比如在 JVM 中创建一个对象实例需要消耗 CPU 和内存资源,如果你的程序需要频繁创建大量的对象,并且这些对象的存活时间短,就意味着需要进行频繁销毁,那么很有可能这部分代码会成为性能的瓶颈。

而“池”就是用来解决这个问题的,简单来说,对象池就是把用过的对象保存起来,等下一次需要这种对象的时候,直接从对象池中拿出来重复使用,避免频繁地创建和销毁。在 Java 中万物皆对象,线程也是一个对象,Java 线程是对操作系统线程的封装,创建 Java 线程也需要消耗系统资源,因此就有了线程池。JDK 中提供了线程池的默认实现,我们也可以通过扩展 Java 原生线程池来实现自己的线程池。

同样,为了提高处理能力和并发度,Web 容器一般会把处理请求的工作放到线程池里来执行,Tomcat 扩展了原生的 Java 线程池,来满足 Web 容器高并发的需求,下面我们就来学习一下 Java 线程池的原理,以及 Tomcat 是如何扩展 Java 线程池的。

Java 线程池

简单的说,Java 线程池里内部维护一个线程数组和一个任务队列,当任务处理不过来的时,就把任务放到队列里慢慢处理。

ThreadPoolExecutor

我们先来看看 Java 线程池核心类 ThreadPoolExecutor 的构造函数,你需要知道 ThreadPoolExecutor 是如何使用这些参数的,这是理解 Java 线程工作原理的关键。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

每次提交任务时,如果线程数还没达到核心线程数corePoolSize,线程池就创建新线程来执行。当线程数达到corePoolSize后,新增的任务就放到工作队列workQueue里,而线程池中的线程则努力地从workQueue里拉活来干,也就是调用 poll 方法来获取任务。

如果任务很多,并且workQueue是个有界队列,队列可能会满,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数maximumPoolSize,则不能再创建新的临时线程了,转而执行拒绝策略handler,比如抛出异常或者由调用者线程来执行任务等。

如果高峰过去了,线程池比较闲了怎么办?临时线程使用 poll(keepAliveTime, unit)方法从工作队列中拉活干,请注意 poll 方法设置了超时时间,如果超时了仍然两手空空没拉到活,表明它太闲了,这个线程会被销毁回收。

那还有一个参数threadFactory是用来做什么的呢?通过它你可以扩展原生的线程工厂,比如给创建出来的线程取个有意义的名字。

FixedThreadPool/CachedThreadPool

Java 提供了一些默认的线程池实现,比如 FixedThreadPool 和 CachedThreadPool,它们的本质就是给 ThreadPoolExecutor 设置了不同的参数,是定制版的 ThreadPoolExecutor。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}
 
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从上面的代码你可以看到:

  • FixedThreadPool 有固定长度(nThreads)的线程数组,忙不过来时会把任务放到无限长的队列里,这是因为LinkedBlockingQueue 默认是一个无界队列
  • CachedThreadPool 的 maximumPoolSize 参数值是Integer.MAX_VALUE,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是SynchronousQueue,表明队列长度为 0。

Tomcat 线程池

跟 FixedThreadPool/CachedThreadPool 一样,Tomcat 的线程池也是一个定制版的 ThreadPoolExecutor。

定制版的 ThreadPoolExecutor

通过比较 FixedThreadPool 和 CachedThreadPool,我们发现它们传给 ThreadPoolExecutor 的参数有两个关键点:

  • 是否限制线程个数。
  • 是否限制队列长度。

对于 Tomcat 来说,这两个资源都需要限制,也就是说要对高并发进行控制,否则 CPU 和内存有资源耗尽的风险。因此 Tomcat 传入的参数是这样的:

// 定制版的任务队列
taskqueue = new TaskQueue(maxQueueSize);
 
// 定制版的线程工厂
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
 
// 定制版的线程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);

你可以看到其中的两个关键点:

  • Tomcat 有自己的定制版任务队列和线程工厂,并且可以限制任务队列的长度,它的最大长度是 maxQueueSize。
  • Tomcat 对线程数也有限制,设置了核心线程数(minSpareThreads)和最大线程池数(maxThreads)。

除了资源限制以外,Tomcat 线程池还定制自己的任务处理流程。我们知道 Java 原生线程池的任务处理逻辑比较简单:

  1. 前 corePoolSize 个任务时,来一个任务就创建一个新线程。
  2. 后面再来任务,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到 maximumPoolSize,执行拒绝策略。

Tomcat 线程池扩展了原生的 ThreadPoolExecutor,通过重写 execute 方法实现了自己的任务处理逻辑:

  1. 前 corePoolSize 个任务时,来一个任务就创建一个新线程。
  2. 再来任务的话,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到 maximumPoolSize,则继续尝试把任务添加到任务队列中去。
  4. 如果缓冲队列也满了,插入失败,执行拒绝策略。

观察 Tomcat 线程池和 Java 原生线程池的区别,其实就是在第 3 步,Tomcat 在线程总数达到最大数时,不是立即执行拒绝策略,而是再尝试向任务队列添加任务,添加失败后再执行拒绝策略。那具体如何实现呢,其实很简单,我们来看一下 Tomcat 线程池的 execute 方法的核心代码。

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
  
  ...
  
  public void execute(Runnable command, long timeout, TimeUnit unit) {
      submittedCount.incrementAndGet();
      try {
          // 调用 Java 原生线程池的 execute 去执行任务
          super.execute(command);
      } catch (RejectedExecutionException rx) {
         // 如果总线程数达到 maximumPoolSize,Java 原生线程池执行拒绝策略
          if (super.getQueue() instanceof TaskQueue) {
              final TaskQueue queue = (TaskQueue)super.getQueue();
              try {
                  // 继续尝试把任务放到任务队列中去
                  if (!queue.force(command, timeout, unit)) {
                      submittedCount.decrementAndGet();
                      // 如果缓冲队列也满了,插入失败,执行拒绝策略。
                      throw new RejectedExecutionException("...");
                  }
              } 
          }
      }
}

从这个方法你可以看到,Tomcat 线程池的 execute 方法会调用 Java 原生线程池的 execute 去执行任务,如果总线程数达到 maximumPoolSize,Java 原生线程池的 execute 方法会抛出 RejectedExecutionException 异常,但是这个异常会被 Tomcat 线程池的 execute 方法捕获到,并继续尝试把这个任务放到任务队列中去;如果任务队列也满了,再执行拒绝策略。

定制版的任务队列

细心的你有没有发现,在 Tomcat 线程池的 execute 方法最开始有这么一行:

submittedCount.incrementAndGet();

这行代码的意思把 submittedCount 这个原子变量加一,并且在任务执行失败,抛出拒绝异常时,将这个原子变量减一:

submittedCount.decrementAndGet();

其实 Tomcat 线程池是用这个变量 submittedCount 来维护已经提交到了线程池,但是还没有执行完的任务个数。Tomcat 为什么要维护这个变量呢?这跟 Tomcat 的定制版的任务队列有关。Tomcat 的任务队列 TaskQueue 扩展了 Java 中的 LinkedBlockingQueue,我们知道 LinkedBlockingQueue 默认情况下长度是没有限制的,除非给它一个 capacity。因此 Tomcat 给了它一个 capacity,TaskQueue 的构造函数中有个整型的参数 capacity,TaskQueue 将 capacity 传给父类 LinkedBlockingQueue 的构造函数。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
 
  public TaskQueue(int capacity) {
      super(capacity);
  }
  ...
}

这个 capacity 参数是通过 Tomcat 的 maxQueueSize 参数来设置的,但问题是默认情况下 maxQueueSize 的值是Integer.MAX_VALUE,等于没有限制,这样就带来一个问题:当前线程数达到核心线程数之后,再来任务的话线程池会把任务添加到任务队列,并且总是会成功,这样永远不会有机会创建新线程了。

为了解决这个问题,TaskQueue 重写了 LinkedBlockingQueue 的 offer 方法,在合适的时机返回 false,返回 false 表示任务添加失败,这时线程池会创建新的线程。那什么是合适的时机呢?请看下面 offer 方法的核心源码:

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
 
  ...
   @Override
  // 线程池调用任务队列的方法时,当前线程数肯定已经大于核心线程数了
  public boolean offer(Runnable o) {
 
      // 如果线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
      if (parent.getPoolSize() == parent.getMaximumPoolSize()) 
          return super.offer(o);
          
      // 执行到这里,表明当前线程数大于核心线程数,并且小于最大线程数。
      // 表明是可以创建新线程的,那到底要不要创建呢?分两种情况:
      
      //1. 如果已提交的任务数小于当前线程数,表示还有空闲线程,无需创建新线程
      if (parent.getSubmittedCount()<=(parent.getPoolSize())) 
          return super.offer(o);
          
      //2. 如果已提交的任务数大于当前线程数,线程不够用了,返回 false 去创建新线程
      if (parent.getPoolSize()<parent.getMaximumPoolSize()) 
          return false;
          
      // 默认情况下总是把任务添加到任务队列
      return super.offer(o);
  }
  
}

从上面的代码我们看到,只有当前线程数大于核心线程数、小于最大线程数,并且已提交的任务个数大于当前线程数时,也就是说线程不够用了,但是线程数又没达到极限,才会去创建新的线程。这就是为什么 Tomcat 需要维护已提交任务数这个变量,它的目的就是在任务队列的长度无限制的情况下,让线程池有机会创建新的线程

当然默认情况下 Tomcat 的任务队列是没有限制的,你可以通过设置 maxQueueSize 参数来限制任务队列的长度。

本期精华

池化的目的是为了避免频繁地创建和销毁对象,减少对系统资源的消耗。Java 提供了默认的线程池实现,我们也可以扩展 Java 原生的线程池来实现定制自己的线程池,Tomcat 就是这么做的。Tomcat 扩展了 Java 线程池的核心类 ThreadPoolExecutor,并重写了它的 execute 方法,定制了自己的任务处理流程。同时 Tomcat 还实现了定制版的任务队列,重写了 offer 方法,使得在任务队列长度无限制的情况下,线程池仍然有机会创建新的线程。

18 | 新特性:Tomcat如何支持WebSocket?

我们知道 HTTP 协议是“请求 - 响应”模式,浏览器必须先发请求给服务器,服务器才会响应这个请求。也就是说,服务器不会主动发送数据给浏览器。

对于实时性要求比较的高的应用,比如在线游戏、股票基金实时报价和在线协同编辑等,浏览器需要实时显示服务器上最新的数据,因此出现了 Ajax 和 Comet 技术。Ajax 本质上还是轮询,而 Comet 是在 HTTP 长连接的基础上做了一些 hack,但是它们的实时性不高,另外频繁的请求会给服务器带来压力,也会浪费网络流量和带宽。于是 HTML5 推出了 WebSocket 标准,使得浏览器和服务器之间任何一方都可以主动发消息给对方,这样服务器有新数据时可以主动推送给浏览器。

今天我会介绍 WebSocket 的工作原理,以及作为服务器端的 Tomcat 是如何支持 WebSocket 的。更重要的是,希望你在学完之后可以灵活地选用 WebSocket 技术来解决实际工作中的问题。

WebSocket 工作原理

WebSocket 的名字里带有 Socket,那 Socket 是什么呢?网络上的两个程序通过一个双向链路进行通信,这个双向链路的一端称为一个 Socket。一个 Socket 对应一个 IP 地址和端口号,应用程序通常通过 Socket 向网络发出请求或者应答网络请求。Socket 不是协议,它其实是对 TCP/IP 协议层抽象出来的 API。

但 WebSocket 不是一套 API,跟 HTTP 协议一样,WebSocket 也是一个应用层协议。为了跟现有的 HTTP 协议保持兼容,它通过 HTTP 协议进行一次握手,握手之后数据就直接从 TCP 层的 Socket 传输,就与 HTTP 协议无关了。浏览器发给服务端的请求会带上跟 WebSocket 有关的请求头,比如Connection: UpgradeUpgrade: websocket

如果服务器支持 WebSocket,同样会在 HTTP 响应里加上 WebSocket 相关的 HTTP 头部。

这样 WebSocket 连接就建立好了,接下来 WebSocket 的数据传输会以 frame 形式传输,会将一条消息分为几个 frame,按照先后顺序传输出去。这样做的好处有:

  • 大数据的传输可以分片传输,不用考虑数据大小的问题。
  • 和 HTTP 的 chunk 一样,可以边生成数据边传输,提高传输效率。

Tomcat 如何支持 WebSocket

在讲 Tomcat 如何支持 WebSocket 之前,我们先来开发一个简单的聊天室程序,需求是:用户可以通过浏览器加入聊天室、发送消息,聊天室的其他人都可以收到消息。

WebSocket 聊天室程序

浏览器端 JavaScript 核心代码如下:

var Chat = {};
Chat.socket = null;
Chat.connect = (function(host) {
 
    // 判断当前浏览器是否支持 WebSocket
    if ('WebSocket' in window) {
        // 如果支持则创建 WebSocket JS 类
        Chat.socket = new WebSocket(host);
    } else if ('MozWebSocket' in window) {
        Chat.socket = new MozWebSocket(host);
    } else {
        Console.log('WebSocket is not supported by this browser.');
        return;
    }
 
    // 回调函数,当和服务器的 WebSocket 连接建立起来后,浏览器会回调这个方法
    Chat.socket.onopen = function () {
        Console.log('Info: WebSocket connection opened.');
        document.getElementById('chat').onkeydown = function(event) {
            if (event.keyCode == 13) {
                Chat.sendMessage();
            }
        };
    };
 
    // 回调函数,当和服务器的 WebSocket 连接关闭后,浏览器会回调这个方法
    Chat.socket.onclose = function () {
        document.getElementById('chat').onkeydown = null;
        Console.log('Info: WebSocket closed.');
    };
 
    // 回调函数,当服务器有新消息发送到浏览器,浏览器会回调这个方法
    Chat.socket.onmessage = function (message) {
        Console.log(message.data);
    };
});

上面的代码实现逻辑比较清晰,就是创建一个 WebSocket JavaScript 对象,然后实现了几个回调方法:onopen、onclose 和 onmessage。当连接建立、关闭和有新消息时,浏览器会负责调用这些回调方法。我们再来看服务器端 Tomcat 的实现代码:

//Tomcat 端的实现类加上 @ServerEndpoint 注解,里面的 value 是 URL 路径
@ServerEndpoint(value = "/websocket/chat")
public class ChatEndpoint {
 
    private static final String GUEST_PREFIX = "Guest";
    
    // 记录当前有多少个用户加入到了聊天室,它是 static 全局变量。为了多线程安全使用原子变量 AtomicInteger
    private static final AtomicInteger connectionIds = new AtomicInteger(0);
    
    // 每个用户用一个 CharAnnotation 实例来维护,请你注意它是一个全局的 static 变量,所以用到了线程安全的 CopyOnWriteArraySet
    private static final Set<ChatEndpoint> connections =
            new CopyOnWriteArraySet<>();
 
    private final String nickname;
    private Session session;
 
    public ChatEndpoint() {
        nickname = GUEST_PREFIX + connectionIds.getAndIncrement();
    }
 
    // 新连接到达时,Tomcat 会创建一个 Session,并回调这个函数
    @OnOpen
    public void start(Session session) {
        this.session = session;
        connections.add(this);
        String message = String.format("* %s %s", nickname, "has joined.");
        broadcast(message);
    }
 
   // 浏览器关闭连接时,Tomcat 会回调这个函数
    @OnClose
    public void end() {
        connections.remove(this);
        String message = String.format("* %s %s",
                nickname, "has disconnected.");
        broadcast(message);
    }
 
    // 浏览器发送消息到服务器时,Tomcat 会回调这个函数
    @OnMessage
    public void incoming(String message) {
        // Never trust the client
        String filteredMessage = String.format("%s: %s",
                nickname, HTMLFilter.filter(message.toString()));
        broadcast(filteredMessage);
    }
 
    //Websocket 连接出错时,Tomcat 会回调这个函数
    @OnError
    public void onError(Throwable t) throws Throwable {
        log.error("Chat Error: " + t.toString(), t);
    }
 
    // 向聊天室中的每个用户广播消息
    private static void broadcast(String msg) {
        for (ChatAnnotation client : connections) {
            try {
                synchronized (client) {
                    client.session.getBasicRemote().sendText(msg);
                }
            } catch (IOException e) {
              ...
            }
        }
    }
}

根据 Java WebSocket 规范的规定,Java WebSocket 应用程序由一系列的 WebSocket Endpoint 组成。Endpoint 是一个 Java 对象,代表 WebSocket 连接的一端,就好像处理 HTTP 请求的 Servlet 一样,你可以把它看作是处理 WebSocket 消息的接口。跟 Servlet 不同的地方在于,Tomcat 会给每一个 WebSocket 连接创建一个 Endpoint 实例。你可以通过两种方式定义和实现 Endpoint。

第一种方法是编程式的,就是编写一个 Java 类继承javax.websocket.Endpoint,并实现它的 onOpen、onClose 和 onError 方法。这些方法跟 Endpoint 的生命周期有关,Tomcat 负责管理 Endpoint 的生命周期并调用这些方法。并且当浏览器连接到一个 Endpoint 时,Tomcat 会给这个连接创建一个唯一的 Session(javax.websocket.Session)。Session 在 WebSocket 连接握手成功之后创建,并在连接关闭时销毁。当触发 Endpoint 各个生命周期事件时,Tomcat 会将当前 Session 作为参数传给 Endpoint 的回调方法,因此一个 Endpoint 实例对应一个 Session,我们通过在 Session 中添加 MessageHandler 消息处理器来接收消息,MessageHandler 中定义了 onMessage 方法。在这里 Session 的本质是对 Socket 的封装,Endpoint 通过它与浏览器通信。

第二种定义 Endpoint 的方法是注解式的,也就是上面的聊天室程序例子中用到的方式,即实现一个业务类并给它添加 WebSocket 相关的注解。首先我们注意到@ServerEndpoint(value = "/websocket/chat")注解,它表明当前业务类 ChatEndpoint 是一个实现了 WebSocket 规范的 Endpoint,并且注解的 value 值表明 ChatEndpoint 映射的 URL 是/websocket/chat。我们还看到 ChatEndpoint 类中有@OnOpen@OnClose@OnError和在@OnMessage注解的方法,从名字你就知道它们的功能是什么。

对于程序员来说,其实我们只需要专注具体的 Endpoint 的实现,比如在上面聊天室的例子中,为了方便向所有人群发消息,ChatEndpoint 在内部使用了一个全局静态的集合 CopyOnWriteArraySet 来维护所有的 ChatEndpoint 实例,因为每一个 ChatEndpoint 实例对应一个 WebSocket 连接,也就是代表了一个加入聊天室的用户。当某个 ChatEndpoint 实例收到来自浏览器的消息时,这个 ChatEndpoint 会向集合中其他 ChatEndpoint 实例背后的 WebSocket 连接推送消息。

那么这个过程中,Tomcat 主要做了哪些事情呢?简单来说就是两件事情:Endpoint 加载和 WebSocket 请求处理。下面我分别来详细说说 Tomcat 是如何做这两件事情的。

WebSocket 加载

Tomcat 的 WebSocket 加载是通过 SCI 机制完成的。SCI 全称 ServletContainerInitializer,是 Servlet 3.0 规范中定义的用来接收 Web 应用启动事件的接口。那为什么要监听 Servlet 容器的启动事件呢?因为这样我们有机会在 Web 应用启动时做一些初始化工作,比如 WebSocket 需要扫描和加载 Endpoint 类。SCI 的使用也比较简单,将实现 ServletContainerInitializer 接口的类增加 HandlesTypes 注解,并且在注解内指定的一系列类和接口集合。比如 Tomcat 为了扫描和加载 Endpoint 而定义的 SCI 类如下:

@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class, Endpoint.class})
public class WsSci implements ServletContainerInitializer {
  
  public void onStartup(Set<Class<?>> clazzes, ServletContext ctx) throws ServletException {
  ...
  }
}

一旦定义好了 SCI,Tomcat 在启动阶段扫描类时,会将 HandlesTypes 注解中指定的类都扫描出来,作为 SCI 的 onStartup 方法的参数,并调用 SCI 的 onStartup 方法。注意到 WsSci 的 HandlesTypes 注解中定义了ServerEndpoint.classServerApplicationConfig.classEndpoint.class,因此在 Tomcat 的启动阶段会将这些类的类实例(注意不是对象实例)传递给 WsSci 的 onStartup 方法。那么 WsSci 的 onStartup 方法又做了什么事呢?

它会构造一个 WebSocketContainer 实例,你可以把 WebSocketContainer 理解成一个专门处理 WebSocket 请求的Endpoint 容器。也就是说 Tomcat 会把扫描到的 Endpoint 子类和添加了注解@ServerEndpoint的类注册到这个容器中,并且这个容器还维护了 URL 到 Endpoint 的映射关系,这样通过请求 URL 就能找到具体的 Endpoint 来处理 WebSocket 请求。

WebSocket 请求处理

在讲 WebSocket 请求处理之前,我们先来回顾一下 Tomcat 连接器的组件图。

你可以看到 Tomcat 用 ProtocolHandler 组件屏蔽应用层协议的差异,其中 ProtocolHandler 中有两个关键组件:Endpoint 和 Processor。需要注意,这里的 Endpoint 跟上文提到的 WebSocket 中的 Endpoint 完全是两回事,连接器中的 Endpoint 组件用来处理 I/O 通信。WebSocket 本质就是一个应用层协议,因此不能用 HttpProcessor 来处理 WebSocket 请求,而要用专门 Processor 来处理,而在 Tomcat 中这样的 Processor 叫作 UpgradeProcessor。

为什么叫 Upgrade Processor 呢?这是因为 Tomcat 是将 HTTP 协议升级成 WebSocket 协议的,我们知道 WebSocket 是通过 HTTP 协议来进行握手的,因此当 WebSocket 的握手请求到来时,HttpProtocolHandler 首先接收到这个请求,在处理这个 HTTP 请求时,Tomcat 通过一个特殊的 Filter 判断该当前 HTTP 请求是否是一个 WebSocket Upgrade 请求(即包含Upgrade: websocket的 HTTP 头信息),如果是,则在 HTTP 响应里添加 WebSocket 相关的响应头信息,并进行协议升级。具体来说就是用 UpgradeProtocolHandler 替换当前的 HttpProtocolHandler,相应的,把当前 Socket 的 Processor 替换成 UpgradeProcessor,同时 Tomcat 会创建 WebSocket Session 实例和 Endpoint 实例,并跟当前的 WebSocket 连接一一对应起来。这个 WebSocket 连接不会立即关闭,并且在请求处理中,不再使用原有的 HttpProcessor,而是用专门的 UpgradeProcessor,UpgradeProcessor 最终会调用相应的 Endpoint 实例来处理请求。下面我们通过一张图来理解一下。

你可以看到,Tomcat 对 WebSocket 请求的处理没有经过 Servlet 容器,而是通过 UpgradeProcessor 组件直接把请求发到 ServerEndpoint 实例,并且 Tomcat 的 WebSocket 实现不需要关注具体 I/O 模型的细节,从而实现了与具体 I/O 方式的解耦。

本期精华

WebSocket 技术实现了 Tomcat 与浏览器的双向通信,Tomcat 可以主动向浏览器推送数据,可以用来实现对数据实时性要求比较高的应用。这需要浏览器和 Web 服务器同时支持 WebSocket 标准,Tomcat 启动时通过 SCI 技术来扫描和加载 WebSocket 的处理类 ServerEndpoint,并且建立起了 URL 到 ServerEndpoint 的映射关系。

当第一个 WebSocket 请求到达时,Tomcat 将 HTTP 协议升级成 WebSocket 协议,并将该 Socket 连接的 Processor 替换成 UpgradeProcessor。这个 Socket 不会立即关闭,对接下来的请求,Tomcat 通过 UpgradeProcessor 直接调用相应的 ServerEndpoint 来处理。

今天我讲了可以通过两种方式来开发 WebSocket 应用,一种是继承javax.websocket.Endpoint,另一种通过 WebSocket 相关的注解。其实你还可以通过 Spring 来实现 WebSocket 应用,有兴趣的话你可以去研究一下 Spring WebSocket 的原理。

19 | 比较:Jetty的线程策略EatWhatYouKill

我在前面的专栏里介绍了 Jetty 的总体架构设计,简单回顾一下,Jetty 总体上是由一系列 Connector、一系列 Handler 和一个 ThreadPool 组成,它们的关系如下图所示:

相比较 Tomcat 的连接器,Jetty 的 Connector 在设计上有自己的特点。Jetty 的 Connector 支持 NIO 通信模型,我们知道NIO 模型中的主角就是 Selector,Jetty 在 Java 原生 Selector 的基础上封装了自己的 Selector,叫作 ManagedSelector。ManagedSelector 在线程策略方面做了大胆尝试,将 I/O 事件的侦测和处理放到同一个线程来处理,充分利用了 CPU 缓存并减少了线程上下文切换的开销。

具体的数字是,根据 Jetty 的官方测试,这种名为“EatWhatYouKill”的线程策略将吞吐量提高了 8 倍。你一定很好奇它是如何实现的吧,今天我们就来看一看这背后的原理是什么。

Selector 编程的一般思路

常规的 NIO 编程思路是,将 I/O 事件的侦测和请求的处理分别用不同的线程处理。具体过程是:

启动一个线程,在一个死循环里不断地调用 select 方法,检测 Channel 的 I/O 状态,一旦 I/O 事件达到,比如数据就绪,就把该 I/O 事件以及一些数据包装成一个 Runnable,将 Runnable 放到新线程中去处理。

在这个过程中按照职责划分,有两个线程在干活,一个是 I/O 事件检测线程,另一个是 I/O 事件处理线程。我们仔细思考一下这两者的关系,其实它们是生产者和消费者的关系。I/O 事件侦测线程作为生产者,负责“生产”I/O 事件,也就是负责接活儿的老板;I/O 处理线程是消费者,它“消费”并处理 I/O 事件,就是干苦力的员工。把这两个工作用不同的线程来处理,好处是它们互不干扰和阻塞对方。

Jetty 中的 Selector 编程

然而世事无绝对,将 I/O 事件检测和业务处理这两种工作分开的思路也有缺点。当 Selector 检测读就绪事件时,数据已经被拷贝到内核中的缓存了,同时 CPU 的缓存中也有这些数据了,我们知道 CPU 本身的缓存比内存快多了,这时当应用程序去读取这些数据时,如果用另一个线程去读,很有可能这个读线程使用另一个 CPU 核,而不是之前那个检测数据就绪的 CPU 核,这样 CPU 缓存中的数据就用不上了,并且线程切换也需要开销。

因此 Jetty 的 Connector 做了一个大胆尝试,那就是用把 I/O 事件的生产和消费放到同一个线程来处理,如果这两个任务由同一个线程来执行,如果执行过程中线程不阻塞,操作系统会用同一个 CPU 核来执行这两个任务,这样就能利用 CPU 缓存了。那具体是如何做的呢,我们还是来详细分析一下 Connector 中的 ManagedSelector 组件。

ManagedSelector

ManagedSelector 的本质就是一个 Selector,负责 I/O 事件的检测和分发。为了方便使用,Jetty 在 Java 原生的 Selector 上做了一些扩展,就变成了 ManagedSelector,我们先来看看它有哪些成员变量:

public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
    // 原子变量,表明当前的 ManagedSelector 是否已经启动
    private final AtomicBoolean _started = new AtomicBoolean(false);
    
    // 表明是否阻塞在 select 调用上
    private boolean _selecting = false;
    
    // 管理器的引用,SelectorManager 管理若干 ManagedSelector 的生命周期
    private final SelectorManager _selectorManager;
    
    //ManagedSelector 不止一个,为它们每人分配一个 id
    private final int _id;
    
    // 关键的执行策略,生产者和消费者是否在同一个线程处理由它决定
    private final ExecutionStrategy _strategy;
    
    //Java 原生的 Selector
    private Selector _selector;
    
    //"Selector 更新任务 " 队列
    private Deque<SelectorUpdate> _updates = new ArrayDeque<>();
    private Deque<SelectorUpdate> _updateable = new ArrayDeque<>();
    
    ...
}

这些成员变量中其他的都好理解,就是“Selector 更新任务”队列_updates和执行策略_strategy可能不是很直观。

SelectorUpdate 接口

为什么需要一个“Selector 更新任务”队列呢,对于 Selector 的用户来说,我们对 Selector 的操作无非是将 Channel 注册到 Selector 或者告诉 Selector 我对什么 I/O 事件感兴趣,那么这些操作其实就是对 Selector 状态的更新,Jetty 把这些操作抽象成 SelectorUpdate 接口。

/**
 * A selector update to be done when the selector has been woken.
 */
public interface SelectorUpdate
{
    void update(Selector selector);
}

这意味着如果你不能直接操作 ManageSelector 中的 Selector,而是需要向 ManagedSelector 提交一个任务类,这个类需要实现 SelectorUpdate 接口 update 方法,在 update 方法里定义你想要对 ManagedSelector 做的操作。

比如 Connector 中 Endpoint 组件对读就绪事件感兴趣,它就向 ManagedSelector 提交了一个内部任务类 ManagedSelector.SelectorUpdate:

_selector.submit(_updateKeyAction);

这个_updateKeyAction就是一个 SelectorUpdate 实例,它的 update 方法实现如下:

private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate()
{
    @Override
    public void update(Selector selector)
    {
        // 这里的 updateKey 其实就是调用了 SelectionKey.interestOps(OP_READ);
        updateKey();
    }
};

我们看到在 update 方法里,调用了 SelectionKey 类的 interestOps 方法,传入的参数是OP_READ,意思是现在我对这个 Channel 上的读就绪事件感兴趣了。

那谁来负责执行这些 update 方法呢,答案是 ManagedSelector 自己,它在一个死循环里拉取这些 SelectorUpdate 任务类逐个执行。

Selectable 接口

那 I/O 事件到达时,ManagedSelector 怎么知道应该调哪个函数来处理呢?其实也是通过一个任务类接口,这个接口就是 Selectable,它返回一个 Runnable,这个 Runnable 其实就是 I/O 事件就绪时相应的处理逻辑。

public interface Selectable
{
    // 当某一个 Channel 的 I/O 事件就绪后,ManagedSelector 会调用的回调函数
    Runnable onSelected();
 
    // 当所有事件处理完了之后 ManagedSelector 会调的回调函数,我们先忽略。
    void updateKey();
}

ManagedSelector 在检测到某个 Channel 上的 I/O 事件就绪时,也就是说这个 Channel 被选中了,ManagedSelector 调用这个 Channel 所绑定的附件类的 onSelected 方法来拿到一个 Runnable。

这句话有点绕,其实就是 ManagedSelector 的使用者,比如 Endpoint 组件在向 ManagedSelector 注册读就绪事件时,同时也要告诉 ManagedSelector 在事件就绪时执行什么任务,具体来说就是传入一个附件类,这个附件类需要实现 Selectable 接口。ManagedSelector 通过调用这个 onSelected 拿到一个 Runnable,然后把 Runnable 扔给线程池去执行。

那 Endpoint 的 onSelected 是如何实现的呢?

@Override
public Runnable onSelected()
{
    int readyOps = _key.readyOps();
 
    boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
    boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
 
    // return task to complete the job
    Runnable task= fillable 
            ? (flushable 
                    ? _runCompleteWriteFillable 
                    : _runFillable)
            : (flushable 
                    ? _runCompleteWrite 
                    : null);
 
    return task;
}

上面的代码逻辑很简单,就是读事件到了就读,写事件到了就写。

ExecutionStrategy

铺垫了这么多,终于要上主菜了。前面我主要介绍了 ManagedSelector 的使用者如何跟 ManagedSelector 交互,也就是如何注册 Channel 以及 I/O 事件,提供什么样的处理类来处理 I/O 事件,接下来我们来看看 ManagedSelector 是如何统一管理和维护用户注册的 Channel 集合。再回到今天开始的讨论,ManagedSelector 将 I/O 事件的生产和消费看作是生产者消费者模式,为了充分利用 CPU 缓存,生产和消费尽量放到同一个线程处理,那这是如何实现的呢?Jetty 定义了 ExecutionStrategy 接口:

public interface ExecutionStrategy
{
    // 只在 HTTP2 中用到,简单起见,我们先忽略这个方法。
    public void dispatch();
 
    // 实现具体执行策略,任务生产出来后可能由当前线程执行,也可能由新线程来执行
    public void produce();
    
    // 任务的生产委托给 Producer 内部接口,
    public interface Producer
    {
        // 生产一个 Runnable(任务)
        Runnable produce();
    }
}

我们看到 ExecutionStrategy 接口比较简单,它将具体任务的生产委托内部接口 Producer,而在自己的 produce 方法里来实现具体执行逻辑,也就是生产出来的任务要么由当前线程执行,要么放到新线程中执行。Jetty 提供了一些具体策略实现类:ProduceConsume、ProduceExecuteConsume、ExecuteProduceConsume 和 EatWhatYouKill。它们的区别是:

  • ProduceConsume:任务生产者自己依次生产和执行任务,对应到 NIO 通信模型就是用一个线程来侦测和处理一个 ManagedSelector 上所有的 I/O 事件,后面的 I/O 事件要等待前面的 I/O 事件处理完,效率明显不高。通过图来理解,图中绿色表示生产一个任务,蓝色表示执行这个任务。

  • ProduceExecuteConsume:任务生产者开启新线程来运行任务,这是典型的 I/O 事件侦测和处理用不同的线程来处理,缺点是不能利用 CPU 缓存,并且线程切换成本高。同样我们通过一张图来理解,图中的棕色表示线程切换。

  • ExecuteProduceConsume:任务生产者自己运行任务,但是该策略可能会新建一个新线程以继续生产和执行任务。这种策略也被称为“吃掉你杀的猎物”,它来自狩猎伦理,认为一个人不应该杀死他不吃掉的东西,对应线程来说,不应该生成自己不打算运行的任务。它的优点是能利用 CPU 缓存,但是潜在的问题是如果处理 I/O 事件的业务代码执行时间过长,会导致线程大量阻塞和线程饥饿。

  • EatWhatYouKill:这是 Jetty 对 ExecuteProduceConsume 策略的改良,在线程池线程充足的情况下等同于 ExecuteProduceConsume;当系统比较忙线程不够时,切换成 ProduceExecuteConsume 策略。为什么要这么做呢,原因是 ExecuteProduceConsume 是在同一线程执行 I/O 事件的生产和消费,它使用的线程来自 Jetty 全局的线程池,这些线程有可能被业务代码阻塞,如果阻塞得多了,全局线程池中的线程自然就不够用了,最坏的情况是连 I/O 事件的侦测都没有线程可用了,会导致 Connector 拒绝浏览器请求。于是 Jetty 做了一个优化,在低线程情况下,就执行 ProduceExecuteConsume 策略,I/O 侦测用专门的线程处理,I/O 事件的处理扔给线程池处理,其实就是放到线程池的队列里慢慢处理。

分析了这几种线程策略,我们再来看看 Jetty 是如何实现 ExecutionStrategy 接口的。答案其实就是实现 produce 接口生产任务,一旦任务生产出来,ExecutionStrategy 会负责执行这个任务。

private class SelectorProducer implements ExecutionStrategy.Producer
{
    private Set<SelectionKey> _keys = Collections.emptySet();
    private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
 
    @Override
    public Runnable produce()
    {
        while (true)
        {
            // 如何 Channel 集合中有 I/O 事件就绪,调用前面提到的 Selectable 接口获取 Runnable, 直接返回给 ExecutionStrategy 去处理
            Runnable task = processSelected();
            if (task != null)
                return task;
            
           // 如果没有 I/O 事件就绪,就干点杂活,看看有没有客户提交了更新 Selector 的任务,就是上面提到的 SelectorUpdate 任务类。
            processUpdates();
            updateKeys();
 
           // 继续执行 select 方法,侦测 I/O 就绪事件
            if (!select())
                return null;
        }
    }
 }

SelectorProducer 是 ManagedSelector 的内部类,SelectorProducer 实现了 ExecutionStrategy 中的 Producer 接口中的 produce 方法,需要向 ExecutionStrategy 返回一个 Runnable。在这个方法里 SelectorProducer 主要干了三件事情

  1. 如果 Channel 集合中有 I/O 事件就绪,调用前面提到的 Selectable 接口获取 Runnable,直接返回给 ExecutionStrategy 去处理。
  2. 如果没有 I/O 事件就绪,就干点杂活,看看有没有客户提交了更新 Selector 上事件注册的任务,也就是上面提到的 SelectorUpdate 任务类。
  3. 干完杂活继续执行 select 方法,侦测 I/O 就绪事件。

本期精华

多线程虽然是提高并发的法宝,但并不是说线程越多越好,CPU 缓存以及线程上下文切换的开销也是需要考虑的。Jetty 巧妙设计了 EatWhatYouKill 的线程策略,尽量用同一个线程侦测 I/O 事件和处理 I/O 事件,充分利用了 CPU 缓存,并减少了线程切换的开销。

20 | 总结:Tomcat和Jetty中的对象池技术

Java 对象,特别是一个比较大、比较复杂的 Java 对象,它们的创建、初始化和 GC 都需要耗费 CPU 和内存资源,为了减少这些开销,Tomcat 和 Jetty 都使用了对象池技术。所谓的对象池技术,就是说一个 Java 对象用完之后把它保存起来,之后再拿出来重复使用,省去了对象创建、初始化和 GC 的过程。对象池技术是典型的以空间换时间的思路。

由于维护对象池本身也需要资源的开销,不是所有场景都适合用对象池。如果你的 Java 对象数量很多并且存在的时间比较短,对象本身又比较大比较复杂,对象初始化的成本比较高,这样的场景就适合用对象池技术。比如 Tomcat 和 Jetty 处理 HTTP 请求的场景就符合这个特征,请求的数量很多,为了处理单个请求需要创建不少的复杂对象(比如 Tomcat 连接器中 SocketWrapper 和 SocketProcessor),而且一般来说请求处理的时间比较短,一旦请求处理完毕,这些对象就需要被销毁,因此这个场景适合对象池技术。

Tomcat 的 SynchronizedStack

Tomcat 用 SynchronizedStack 类来实现对象池,下面我贴出它的关键代码来帮助你理解。

public class SynchronizedStack<T> {
 
    // 内部维护一个对象数组, 用数组实现栈的功能
    private Object[] stack;
 
    // 这个方法用来归还对象,用 synchronized 进行线程同步
    public synchronized boolean push(T obj) {
        index++;
        if (index == size) {
            if (limit == -1 || size < limit) {
                expand();// 对象不够用了,扩展对象数组
            } else {
                index--;
                return false;
            }
        }
        stack[index] = obj;
        return true;
    }
    
    // 这个方法用来获取对象
    public synchronized T pop() {
        if (index == -1) {
            return null;
        }
        T result = (T) stack[index];
        stack[index--] = null;
        return result;
    }
    
    // 扩展对象数组长度,以 2 倍大小扩展
    private void expand() {
      int newSize = size * 2;
      if (limit != -1 && newSize > limit) {
          newSize = limit;
      }
      // 扩展策略是创建一个数组长度为原来两倍的新数组
      Object[] newStack = new Object[newSize];
      // 将老数组对象引用复制到新数组
      System.arraycopy(stack, 0, newStack, 0, size);
      // 将 stack 指向新数组,老数组可以被 GC 掉了
      stack = newStack;
      size = newSize;
   }
}

这个代码逻辑比较清晰,主要是 SynchronizedStack 内部维护了一个对象数组,并且用数组来实现栈的接口:push 和 pop 方法,这两个方法分别用来归还对象和获取对象。你可能好奇为什么 Tomcat 使用一个看起来比较简单的 SynchronizedStack 来做对象容器,为什么不使用高级一点的并发容器比如 ConcurrentLinkedQueue 呢?

这是因为 SynchronizedStack 用数组而不是链表来维护对象,可以减少结点维护的内存开销,并且它本身只支持扩容不支持缩容,也就是说数组对象在使用过程中不会被重新赋值,也就不会被 GC。这样设计的目的是用最低的内存和 GC 的代价来实现无界容器,同时 Tomcat 的最大同时请求数是有限制的,因此不需要担心对象的数量会无限膨胀。

Jetty 的 ByteBufferPool

我们再来看 Jetty 中的对象池 ByteBufferPool,它本质是一个 ByteBuffer 对象池。当 Jetty 在进行网络数据读写时,不需要每次都在 JVM 堆上分配一块新的 Buffer,只需在 ByteBuffer 对象池里拿到一块预先分配好的 Buffer,这样就避免了频繁的分配内存和释放内存。这种设计你同样可以在高性能通信中间件比如 Mina 和 Netty 中看到。ByteBufferPool 是一个接口:

public interface ByteBufferPool
{
    public ByteBuffer acquire(int size, boolean direct);
 
    public void release(ByteBuffer buffer);
}

接口中的两个方法:acquire 和 release 分别用来分配和释放内存,并且你可以通过 acquire 方法的 direct 参数来指定 buffer 是从 JVM 堆上分配还是从本地内存分配。ArrayByteBufferPool 是 ByteBufferPool 的实现类,我们先来看看它的成员变量和构造函数:

public class ArrayByteBufferPool implements ByteBufferPool
{
    private final int _min;// 最小 size 的 Buffer 长度
    private final int _maxQueue;//Queue 最大长度
    
    // 用不同的 Bucket(桶) 来持有不同 size 的 ByteBuffer 对象, 同一个桶中的 ByteBuffer size 是一样的
    private final ByteBufferPool.Bucket[] _direct;
    private final ByteBufferPool.Bucket[] _indirect;
    
    //ByteBuffer 的 size 增量
    private final int _inc;
    
    public ArrayByteBufferPool(int minSize, int increment, int maxSize, int maxQueue)
    {
        // 检查参数值并设置默认值
        if (minSize<=0)//ByteBuffer 的最小长度
            minSize=0;
        if (increment<=0)
            increment=1024;// 默认以 1024 递增
        if (maxSize<=0)
            maxSize=64*1024;//ByteBuffer 的最大长度默认是 64K
        
        //ByteBuffer 的最小长度必须小于增量
        if (minSize>=increment) 
            throw new IllegalArgumentException("minSize >= increment");
            
        // 最大长度必须是增量的整数倍
        if ((maxSize%increment)!=0 || increment>=maxSize)
            throw new IllegalArgumentException("increment must be a divisor of maxSize");
         
        _min=minSize;
        _inc=increment;
        
        // 创建 maxSize/increment 个桶, 包含直接内存的与 heap 的
        _direct=new ByteBufferPool.Bucket[maxSize/increment];
        _indirect=new ByteBufferPool.Bucket[maxSize/increment];
        _maxQueue=maxQueue;
        int size=0;
        for (int i=0;i<_direct.length;i++)
        {
          size+=_inc;
          _direct[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
          _indirect[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
        }
    }
}

从上面的代码我们看到,ByteBufferPool 是用不同的桶(Bucket)来管理不同长度的 ByteBuffer,因为我们可能需要分配一块 1024 字节的 Buffer,也可能需要一块 64K 字节的 Buffer。而桶的内部用一个 ConcurrentLinkedDeque 来放置 ByteBuffer 对象的引用。

private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>();

你可以通过下面的图再来理解一下:

而 Buffer 的分配和释放过程,就是找到相应的桶,并对桶中的 Deque 做出队和入队的操作,而不是直接向 JVM 堆申请和释放内存。

// 分配 Buffer
public ByteBuffer acquire(int size, boolean direct)
{
    // 找到对应的桶,没有的话创建一个桶
    ByteBufferPool.Bucket bucket = bucketFor(size,direct);
    if (bucket==null)
        return newByteBuffer(size,direct);
    // 这里其实调用了 Deque 的 poll 方法
    return bucket.acquire(direct);
        
}
 
// 释放 Buffer
public void release(ByteBuffer buffer)
{
    if (buffer!=null)
    {
      // 找到对应的桶
      ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
      
      // 这里调用了 Deque 的 offerFirst 方法
  if (bucket!=null)
      bucket.release(buffer);
    }
}

对象池的思考

对象池作为全局资源,高并发环境中多个线程可能同时需要获取对象池中的对象,因此多个线程在争抢对象时会因为锁竞争而阻塞, 因此使用对象池有线程同步的开销,而不使用对象池则有创建和销毁对象的开销。对于对象池本身的设计来说,需要尽量做到无锁化,比如 Jetty 就使用了 ConcurrentLinkedDeque。如果你的内存足够大,可以考虑用线程本地(ThreadLocal)对象池,这样每个线程都有自己的对象池,线程之间互不干扰。

为了防止对象池的无限膨胀,必须要对池的大小做限制。对象池太小发挥不了作用,对象池太大的话可能有空闲对象,这些空闲对象会一直占用内存,造成内存浪费。这里你需要根据实际情况做一个平衡,因此对象池本身除了应该有自动扩容的功能,还需要考虑自动缩容。

所有的池化技术,包括缓存,都会面临内存泄露的问题,原因是对象池或者缓存的本质是一个 Java 集合类,比如 List 和 Stack,这个集合类持有缓存对象的引用,只要集合类不被 GC,缓存对象也不会被 GC。维持大量的对象也比较占用内存空间,所以必要时我们需要主动清理这些对象。以 Java 的线程池 ThreadPoolExecutor 为例,它提供了 allowCoreThreadTimeOut 和 setKeepAliveTime 两种方法,可以在超时后销毁线程,我们在实际项目中也可以参考这个策略。

另外在使用对象池时,我这里还有一些小贴士供你参考:

  • 对象在用完后,需要调用对象池的方法将对象归还给对象池。
  • 对象池中的对象在再次使用时需要重置,否则会产生脏对象,脏对象可能持有上次使用的引用,导致内存泄漏等问题,并且如果脏对象下一次使用时没有被清理,程序在运行过程中会发生意想不到的问题。
  • 对象一旦归还给对象池,使用者就不能对它做任何操作了。
  • 向对象池请求对象时有可能出现的阻塞、异常或者返回 null 值,这些都需要我们做一些额外的处理,来确保程序的正常运行。

本期精华

Tomcat 和 Jetty 都用到了对象池技术,这是因为处理一次 HTTP 请求的时间比较短,但是这个过程中又需要创建大量复杂对象。

对象池技术可以减少频繁创建和销毁对象带来的成本,实现对象的缓存和复用。如果你的系统需要频繁的创建和销毁对象,并且对象的创建代价比较大,这种情况下,一般来说你会观察到 GC 的压力比较大,占用 CPU 率比较高,这个时候你就可以考虑使用对象池了。

还有一种情况是你需要对资源的使用做限制,比如数据库连接,不能无限制地创建数据库连接,因此就有了数据库连接池,你也可以考虑把一些关键的资源池化,对它们进行统一管理,防止滥用。

21 | 总结:Tomcat和Jetty的高性能、高并发之道

高性能程序就是高效的利用 CPU、内存、网络和磁盘等资源,在短时间内处理大量的请求。那如何衡量“短时间和大量”呢?其实就是两个关键指标:响应时间和每秒事务处理量(TPS)。

那什么是资源的高效利用呢? 我觉得有两个原则:

  1. 减少资源浪费。比如尽量避免线程阻塞,因为一阻塞就会发生线程上下文切换,就需要耗费 CPU 资源;再比如网络通信时数据从内核空间拷贝到 Java 堆内存,需要通过本地内存中转。
  2. 当某种资源成为瓶颈时,用另一种资源来换取。比如缓存和对象池技术就是用内存换 CPU;数据压缩后再传输就是用 CPU 换网络。

Tomcat 和 Jetty 中用到了大量的高性能、高并发的设计,我总结了几点:I/O 和线程模型、减少系统调用、池化、零拷贝、高效的并发编程。下面我会详细介绍这些设计,希望你也可以将这些技术用到实际的工作中去。

I/O 和线程模型

I/O 模型的本质就是为了缓解 CPU 和外设之间的速度差。当线程发起 I/O 请求时,比如读写网络数据,网卡数据还没准备好,这个线程就会被阻塞,让出 CPU,也就是说发生了线程切换。而线程切换是无用功,并且线程被阻塞后,它持有内存资源并没有释放,阻塞的线程越多,消耗的内存就越大,因此 I/O 模型的目标就是尽量减少线程阻塞。Tomcat 和 Jetty 都已经抛弃了传统的同步阻塞 I/O,采用了非阻塞 I/O 或者异步 I/O,目的是业务线程不需要阻塞在 I/O 等待上。

除了 I/O 模型,线程模型也是影响性能和并发的关键点。Tomcat 和 Jetty 的总体处理原则是:

  • 连接请求由专门的 Acceptor 线程组处理。
  • I/O 事件侦测也由专门的 Selector 线程组来处理。
  • 具体的协议解析和业务处理可能交给线程池(Tomcat),或者交给 Selector 线程来处理(Jetty)。

将这些事情分开的好处是解耦,并且可以根据实际情况合理设置各部分的线程数。这里请你注意,线程数并不是越多越好,因为 CPU 核的个数有限,线程太多也处理不过来,会导致大量的线程上下文切换。

减少系统调用

其实系统调用是非常耗资源的一个过程,涉及 CPU 从用户态切换到内核态的过程,因此我们在编写程序的时候要有意识尽量避免系统调用。比如在 Tomcat 和 Jetty 中,系统调用最多的就是网络通信操作了,一个 Channel 上的 write 就是系统调用,为了降低系统调用的次数,最直接的方法就是使用缓冲,当输出数据达到一定的大小才 flush 缓冲区。Tomcat 和 Jetty 的 Channel 都带有输入输出缓冲区。

还有值得一提的是,Tomcat 和 Jetty 在解析 HTTP 协议数据时, 都采取了延迟解析的策略,HTTP 的请求体(HTTP Body)直到用的时候才解析。也就是说,当 Tomcat 调用 Servlet 的 service 方法时,只是读取了和解析了 HTTP 请求头,并没有读取 HTTP 请求体。

直到你的 Web 应用程序调用了 ServletRequest 对象的 getInputStream 方法或者 getParameter 方法时,Tomcat 才会去读取和解析 HTTP 请求体中的数据;这意味着如果你的应用程序没有调用上面那两个方法,HTTP 请求体的数据就不会被读取和解析,这样就省掉了一次 I/O 系统调用。

池化、零拷贝

关于池化和零拷贝,我在专栏前面已经详细讲了它们的原理,你可以回过头看看专栏第 20 期第 16 期。其实池化的本质就是用内存换 CPU;而零拷贝就是不做无用功,减少资源浪费。

高效的并发编程

我们知道并发的过程中为了同步多个线程对共享变量的访问,需要加锁来实现。而锁的开销是比较大的,拿锁的过程本身就是个系统调用,如果锁没拿到线程会阻塞,又会发生线程上下文切换,尤其是大量线程同时竞争一把锁时,会浪费大量的系统资源。因此作为程序员,要有意识的尽量避免锁的使用,比如可以使用原子类 CAS 或者并发集合来代替。如果万不得已需要用到锁,也要尽量缩小锁的范围和锁的强度。接下来我们来看看 Tomcat 和 Jetty 如何做到高效的并发编程的。

缩小锁的范围

缩小锁的范围,其实就是不直接在方法上加 synchronized,而是使用细粒度的对象锁。

protected void startInternal() throws LifecycleException {
 
    setState(LifecycleState.STARTING);
 
    // 锁 engine 成员变量
    if (engine != null) {
        synchronized (engine) {
            engine.start();
        }
    }
 
   // 锁 executors 成员变量
    synchronized (executors) {
        for (Executor executor: executors) {
            executor.start();
        }
    }
 
    mapperListener.start();
 
    // 锁 connectors 成员变量
    synchronized (connectorsLock) {
        for (Connector connector: connectors) {
            // If it has already failed, don't try and start it
            if (connector.getState() != LifecycleState.FAILED) {
                connector.start();
            }
        }
    }
}

比如上面的代码是 Tomcat 的 StandardService 组件的启动方法,这个启动方法要启动三种子组件:engine、executors 和 connectors。它没有直接在方法上加锁,而是用了三把细粒度的锁,来分别用来锁三个成员变量。如果直接在方法上加 synchronized,多个线程执行到这个方法时需要排队;而在对象级别上加 synchronized,多个线程可以并行执行这个方法,只是在访问某个成员变量时才需要排队。

用原子变量和 CAS 取代锁

下面的代码是 Jetty 线程池的启动方法,它的主要功能就是根据传入的参数启动相应个数的线程。

private boolean startThreads(int threadsToStart)
{
    while (threadsToStart > 0 && isRunning())
    {
        // 获取当前已经启动的线程数,如果已经够了就不需要启动了
        int threads = _threadsStarted.get();
        if (threads >= _maxThreads)
            return false;
 
        // 用 CAS 方法将线程数加一,请注意执行失败走 continue,继续尝试
        if (!_threadsStarted.compareAndSet(threads, threads + 1))
            continue;
 
        boolean started = false;
        try
        {
            Thread thread = newThread(_runnable);
            thread.setDaemon(isDaemon());
            thread.setPriority(getThreadsPriority());
            thread.setName(_name + "-" + thread.getId());
            _threads.add(thread);//_threads 并发集合
            _lastShrink.set(System.nanoTime());//_lastShrink 是原子变量
            thread.start();
            started = true;
            --threadsToStart;
        }
        finally
        {
            // 如果最终线程启动失败,还需要把线程数减一
            if (!started)
                _threadsStarted.decrementAndGet();
        }
    }
    return true;
}

你可以看到整个函数的实现是一个while 循环,并且是无锁的。_threadsStarted表示当前线程池已经启动了多少个线程,它是一个原子变量 AtomicInteger,首先通过它的 get 方法拿到值,如果线程数已经达到最大值,直接返回。否则尝试用 CAS 操作将_threadsStarted的值加一,如果成功了意味着没有其他线程在改这个值,当前线程可以继续往下执行;否则走 continue 分支,也就是继续重试,直到成功为止。在这里当然你也可以使用锁来实现,但是我们的目的是无锁化。

并发容器的使用

CopyOnWriteArrayList 适用于读多写少的场景,比如 Tomcat 用它来“存放”事件监听器,这是因为监听器一般在初始化过程中确定后就基本不会改变,当事件触发时需要遍历这个监听器列表,所以这个场景符合读多写少的特征。

public abstract class LifecycleBase implements Lifecycle {
 
    // 事件监听器集合
    private final List<LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<>();
    
    ...
}

volatile 关键字的使用

再拿 Tomcat 中的 LifecycleBase 作为例子,它里面的生命状态就是用 volatile 关键字修饰的。volatile 的目的是为了保证一个线程修改了变量,另一个线程能够读到这种变化。对于生命状态来说,需要在各个线程中保持是最新的值,因此采用了 volatile 修饰。

public abstract class LifecycleBase implements Lifecycle {
 
    // 当前组件的生命状态,用 volatile 修饰
    private volatile LifecycleState state = LifecycleState.NEW;
 
}

本期精华

高性能程序能够高效的利用系统资源,首先就是减少资源浪费,比如要减少线程的阻塞,因为阻塞会导致资源闲置和线程上下文切换,Tomcat 和 Jetty 通过合理的 I/O 模型和线程模型减少了线程的阻塞。

另外系统调用会导致用户态和内核态切换的过程,Tomcat 和 Jetty 通过缓存和延迟解析尽量减少系统调用,另外还通过零拷贝技术避免多余的数据拷贝。

高效的利用资源还包括另一层含义,那就是我们在系统设计的过程中,经常会用一种资源换取另一种资源,比如 Tomcat 和 Jetty 中使用的对象池技术,就是用内存换取 CPU,将数据压缩后再传输就是用 CPU 换网络。

除此之外,高效的并发编程也很重要,多线程虽然可以提高并发度,也带来了锁的开销,因此我们在实际编程过程中要尽量避免使用锁,比如可以用原子变量和 CAS 操作来代替锁。如果实在避免不了用锁,也要尽量减少锁的范围和强度,比如可以用细粒度的对象锁或者低强度的读写锁。Tomcat 和 Jetty 的代码也很好的实践了这一理念。

22 | 热点问题答疑(2):内核如何阻塞与唤醒进程?

在专栏的第三个模块,我们学习了 Tomcat 连接器组件的设计,其中最重要的是各种 I/O 模型及其实现。而 I/O 模型跟操作系统密切相关,要彻底理解这些原理,我们首先需要弄清楚什么是进程和线程,什么是虚拟内存和物理内存,什么是用户空间和内核空间,线程的阻塞到底意味着什么,内核又是如何唤醒用户线程的等等这些问题。可以说掌握这些底层的知识,对于你学习 Tomcat 和 Jetty 的原理,乃至其他各种后端架构都至关重要,这些知识可以说是后端开发的“基石”。

在专栏的留言中我也发现很多同学反馈对这些底层的概念很模糊,那今天作为模块的答疑篇,我就来跟你聊聊这些问题。

进程和线程

我们先从 Linux 的进程谈起,操作系统要运行一个可执行程序,首先要将程序文件加载到内存,然后 CPU 去读取和执行程序指令,而一个进程就是“一次程序的运行过程”,内核会给每一个进程创建一个名为task_struct的数据结构,而内核也是一段程序,系统启动时就被加载到内存中了。

进程在运行过程中要访问内存,而物理内存是有限的,比如 16GB,那怎么把有限的内存分给不同的进程使用呢?跟 CPU 的分时共享一样,内存也是共享的,Linux 给每个进程虚拟出一块很大的地址空间,比如 32 位机器上进程的虚拟内存地址空间是 4GB,从 0x00000000 到 0xFFFFFFFF。但这 4GB 并不是真实的物理内存,而是进程访问到了某个虚拟地址,如果这个地址还没有对应的物理内存页,就会产生缺页中断,分配物理内存,MMU(内存管理单元)会将虚拟地址与物理内存页的映射关系保存在页表中,再次访问这个虚拟地址,就能找到相应的物理内存页。每个进程的这 4GB 虚拟地址空间分布如下图所示:

进程的虚拟地址空间总体分为用户空间和内核空间,低地址上的 3GB 属于用户空间,高地址的 1GB 是内核空间,这是基于安全上的考虑,用户程序只能访问用户空间,内核程序可以访问整个进程空间,并且只有内核可以直接访问各种硬件资源,比如磁盘和网卡。那用户程序需要访问这些硬件资源该怎么办呢?答案是通过系统调用,系统调用可以理解为内核实现的函数,比如应用程序要通过网卡接收数据,会调用 Socket 的 read 函数:

ssize_t read(int fd,void *buf,size_t nbyte)

CPU 在执行系统调用的过程中会从用户态切换到内核态,CPU 在用户态下执行用户程序,使用的是用户空间的栈,访问用户空间的内存;当 CPU 切换到内核态后,执行内核代码,使用的是内核空间上的栈。

从上面这张图我们看到,用户空间从低到高依次是代码区、数据区、堆、共享库与 mmap 内存映射区、栈、环境变量。其中堆向高地址增长,栈向低地址增长。

请注意用户空间上还有一个共享库和 mmap 映射区,Linux 提供了内存映射函数 mmap, 它可将文件内容映射到这个内存区域,用户通过读写这段内存,从而实现对文件的读取和修改,无需通过 read/write 系统调用来读写文件,省去了用户空间和内核空间之间的数据拷贝,Java 的 MappedByteBuffer 就是通过它来实现的;用户程序用到的系统共享库也是通过 mmap 映射到了这个区域。

我在开始提到的task_struct结构体本身是分配在内核空间,它的vm_struct成员变量保存了各内存区域的起始和终止地址,此外task_struct中还保存了进程的其他信息,比如进程号、打开的文件、创建的 Socket 以及 CPU 运行上下文等。

在 Linux 中,线程是一个轻量级的进程,轻量级说的是线程只是一个 CPU 调度单元,因此线程有自己的task_struct结构体和运行栈区,但是线程的其他资源都是跟父进程共用的,比如虚拟地址空间、打开的文件和 Socket 等。

阻塞与唤醒

我们知道当用户线程发起一个阻塞式的 read 调用,数据未就绪时,线程就会阻塞,那阻塞具体是如何实现的呢?

Linux 内核将线程当作一个进程进行 CPU 调度,内核维护了一个可运行的进程队列,所有处于TASK_RUNNING状态的进程都会被放入运行队列中,本质是用双向链表将task_struct链接起来,排队使用 CPU 时间片,时间片用完重新调度 CPU。所谓调度就是在可运行进程列表中选择一个进程,再从 CPU 列表中选择一个可用的 CPU,将进程的上下文恢复到这个 CPU 的寄存器中,然后执行进程上下文指定的下一条指令。

而阻塞的本质就是将进程的task_struct移出运行队列,添加到等待队列,并且将进程的状态的置为TASK_UNINTERRUPTIBLE或者TASK_INTERRUPTIBLE,重新触发一次 CPU 调度让出 CPU。

那线程怎么唤醒呢?线程在加入到等待队列的同时向内核注册了一个回调函数,告诉内核我在等待这个 Socket 上的数据,如果数据到了就唤醒我。这样当网卡接收到数据时,产生硬件中断,内核再通过调用回调函数唤醒进程。唤醒的过程就是将进程的task_struct从等待队列移到运行队列,并且将task_struct的状态置为TASK_RUNNING,这样进程就有机会重新获得 CPU 时间片。

这个过程中,内核还会将数据从内核空间拷贝到用户空间的堆上。

当 read 系统调用返回时,CPU 又从内核态切换到用户态,继续执行 read 调用的下一行代码,并且能从用户空间上的 Buffer 读到数据了。

小结

今天我们谈到了一次 Socket read 系统调用的过程:首先 CPU 在用户态执行应用程序的代码,访问进程虚拟地址空间的用户空间;read 系统调用时 CPU 从用户态切换到内核态,执行内核代码,内核检测到 Socket 上的数据未就绪时,将进程的task_struct结构体从运行队列中移到等待队列,并触发一次 CPU 调度,这时进程会让出 CPU;当网卡数据到达时,内核将数据从内核空间拷贝到用户空间的 Buffer,接着将进程的task_struct结构体重新移到运行队列,这样进程就有机会重新获得 CPU 时间片,系统调用返回,CPU 又从内核态切换到用户态,访问用户空间的数据。