Java多线程高并发系列:(九)BIO与NIO

2025-01-29 20:35
375
0

一、阻塞与非阻塞、同步与异步

阻塞与非阻塞是描述进程在访问某个资源时,数据是否准备就绪的的一种处理方式。

  • 阻塞式:当我们没有获取到数据的时候,整个应用可能产生阻塞,放弃CPU执行,无法去做其他事情。应用场景:悲观锁
  • 非阻塞式:不管是否有获取到数据,都立马告诉一个结果,如果没有得到数据的情况下返回一个错误标记,根据错误的标记不断的轮询。应用场景:CAS无锁机制、乐观锁

同步与异步是指访问数据的机制。

  • 同步:一般指主动请求并等待IO操作完成的方式。
  • 异步:指主动请求数据后便可以继续处理其它任务,随后等待IO操作完毕的通知。

以一些现实生活中的示例来理解上面的概念:

1、普通水壶煮水,站在旁边,主动的看水开了没有?这就是:同步的阻塞
2、普通水壶煮水,去干点别的事,每过一段时间去看看水开了没有,水没开就走人。这就是 :同步非阻塞
3、响水壶煮水,站在旁边,不会每过一段时间主动看水开了没有。如果水开了,水壶自动通知他。 这就是:异步阻塞
4、响水壶煮水,去干点别的事,如果水开了,水壶自动通知他。这就是:异步非阻塞

二、BIO(Blocking IO 阻塞式IO)介绍

传统BIO是一种同步的阻塞IO模型,基于字节流和字符流进行操作。IO在进行读写时,该线程将被阻塞,线程无法进行其它操作,直到发生以下情况:1、有数据可以读取。2、数据读取完成。3、发生异常。

BIO 基于操作系统提供的底层 I/O 机制实现。在 Java 中,通过InputStream和OutputStream等类来进行输入输出操作。当调用这些流的读取或写入方法时,Java 虚拟机(JVM)会将这些操作转换为底层操作系统的 I/O 系统调用。操作系统负责实际的物理设备(如磁盘、网络等)的数据传输。在数据传输完成之前,线程会被阻塞,等待操作系统返回操作结果。

示例代码:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class BIOExample {
    public static void main(String[] args) {
        // 创建一个BufferedReader对象,用于从控制台读取用户输入
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        System.out.println("请输入内容:");
        try {
            // 从控制台读取一行输入,线程会在此处阻塞,直到用户输入并按下回车键
            String input = reader.readLine();
            // 输出用户输入的内容
            System.out.println("你输入的内容是:" + input);
        } catch (IOException e) {
            // 处理读取过程中可能出现的IO异常
            e.printStackTrace();
        } finally {
            // 关闭BufferedReader,释放资源
            try {
                if (reader!= null) {
                    reader.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

BIO 虽然简单直观,但在处理高并发或大量 I/O 操作时性能较低,因为每个 I/O 操作都会阻塞线程,可能导致线程资源的大量消耗和系统性能的下降。

对BIO的简单优化可以采用伪异步IO模型:

  • 使用多线程把阻塞后的要执行的业务逻辑放到线程中执行,继续优化可以加上线程池进行复用。
  • 缺点:线程多消耗服务器资源

三、NIO(No Blooking IO 非阻塞式IO)介绍

Java语言在jdk1.4版本推出的一个同步非阻塞IO模型,对原来BIO实现的优化。

NIO与BIO区别:BIO是面向流的,NIO是面向缓冲区的。 

在 Java 7 中,NIO 有了进一步的改进,也就是 NIO 2,引入了异步非阻塞 IO 方式,也有很多人叫它 AIO(Asynchronous IO)。异步 IO 操作基于事件和回调机制,可以简单理解为,应用操作直接返回,而不会阻塞在那里,当后台处理完成,操作系统会通知相应线程进行后续工作。

3.1、NIO的核心组件

3.1.1、Chancel(通道)

数据传输都经过管道,实现非阻塞IO。Channel是一个双向通道,与传统IO操作只允许单向的读写不同的是,Channel允许在一个通道上进行读和写的操作。Channel可以通过配置它的阻塞行为,来实现阻塞和非阻塞式。

常用通道:

  • FileChannel:文件
  • SocketChannel:Socket客户端
  • ServerSocketChannel:Socket服务端
  • DatagramChannel: UDP

3.1.2、Selector(多路复用器)

chancel都会统一注册到Selector选择器,用于监听多个通道的事件,实现统一管理和维护。

Selector与Channel是相互配合使用的,将Channel注册在Selector上之后,才可以正确的使用Selector,但此时Channel必须为非阻塞模式。Selector可以监听Channel的四种状态(Connect、Accept、Read、Write),当监听到某一Channel的某个状态时,才允许对Channel进行相应的操作。

  • 多路:就是多个不同的请求;
  • 复用:一个线程可以维护多个不同的IO操作。
  • 好处:占用CPU资源小,保证线程安全的问题。

Selector的核心组件:

  • Selector:用于监控多个通道的事件。
  • SelectableChannel:可以被注册到 Selector 的通道(如 SocketChannel、ServerSocketChannel)。
  • SelectionKey:表示Channel与 Selector 的注册关系,包含通道和感兴趣的事件。

selectionKey将Channel与Selector建立了关系,并维护了channel事件。可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它.所以在调用某个key时,需要使用isValid进行校验。

selectionKey通过四个常量表示四种事件:

  • OP_ACCEPT:连接可接受操作,仅ServerSocketChannel支持
  • OP_CONNECT:连接操作,Client端支持的一种操作
  • OP_READ/OP_WRITE:读写

Selector的使用步骤:

1、使用 Selector.open() 创建一个 Selector 实例。

Selector selector = Selector.open();

2、创建通道(如 ServerSocketChannel 或 SocketChannel),并将其设置为非阻塞模式。

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080)); // 绑定端口
serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式

3、将通道注册到 Selector,并指定感兴趣的事件类型(如 OP_ACCEPT、OP_READ 等)。

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

4、使用 Selector.select() 方法轮询就绪的事件,并根据 SelectionKey 的事件类型,执行相应的操作。

while (true) {
    int readyChannels = selector.select(); // 阻塞直到有事件就绪
    if (readyChannels == 0) continue;

    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();

        if (key.isAcceptable()) {
            // 处理新连接
        } else if (key.isReadable()) {
            // 处理读事件
        } else if (key.isWritable()) {
            // 处理写事件
        }

        keyIterator.remove(); // 移除已处理的事件
    }
}

Selector 的注意事项:

  • 非阻塞模式:注册到 Selector 的通道必须处于非阻塞模式。
  • 事件处理:处理完事件后,必须从 selectedKeys 集合中移除 SelectionKey,否则会导致重复处理。
  • 线程安全:Selector 不是线程安全的,建议在单线程中使用。

3.1.3、Buffer(缓冲区)

Buffer它是一个缓冲区,实际上也是一个容器,一个连续数组。

NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

为了理解Buffer的工作原理,需要熟悉它的三个属性:capacity、position和limit

position和limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,capacity的含义总是一样的。见下图:

  • capacity:作为一个内存块,Buffer有固定的大小值,也叫作“capacity”,只能往其中写入capacity个byte、long、char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据。
  • position:当你写数据到Buffer中时,position表示当前的位置。初始的position值为0,当写入一个字节数据到Buffer中后,position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity-1。当读取数据时,也是从某个特定位置读,将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取一个字节数据后,position向前移动到下一个可读的位置。
  • limit:在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)

3.1.4、Buffer的操作

Buffer的分配:对Buffer对象的操作必须首先调用分配方法进行分配,Buffer提供一个allocate(int capacity)方法分配一个指定字节大小的对象。

向Buffer中写数据:

写数据到Buffer中有两种方式:
      1、从channel写到Buffer
          int bytes = channel.read(buf); //将channel中的数据读取到buf中
      2、通过Buffer的put()方法写到Buffer
          buf.put(byte); //将数据通过put()方法写入到buf中

 flip()方法:将Buffer从写模式切换到读模式,调用flip()方法会将position设置为0,并将limit设置为之前的position的值。

从Buffer中读数据:

从Buffer中读数据有两种方式:
      1、从Buffer读取数据到Channel
          int bytes = channel.write(buf); //将buf中的数据读取到channel中
      2、通过Buffer的get()方法读取数据
          byte bt = buf.get(); //从buf中读取一个byte

rewind()方法:Buffer.rewind()方法将position设置为0,使得可以重读Buffer中的所有数据,limit保持不变。
 

clear()与compact()方法:一旦读完Buffer中的数据,需要让Buffer准备好再次被写入,可以通过clear()或compact()方法完成。如果调用的是clear()方法,position将被设置为0,limit设置为capacity的值。但是Buffer并未被清空,只是通过这些标记告诉我们可以从哪里开始往Buffer中写入多少数据。如果Buffer中还有一些未读的数据,调用clear()方法将被"遗忘 "。compact()方法将所有未读的数据拷贝到Buffer起始处,然后将position设置到最后一个未读元素的后面,limit属性依然设置为capacity。可以使得Buffer中的未读数据还可以在后续中被使用。
 

mark()与reset()方法:通过调用Buffer.mark()方法可以标记一个特定的position,之后可以通过调用Buffer.reset()恢复到这个position上。

3.2、NIO优点

Java NIO 相较于传统的 Java IO 有诸多优点,主要体现在以下几个方面:

1、性能提升方面

  • 非阻塞 IO 操作:Java NIO 允许进行非阻塞式的 IO 操作,线程在执行 IO 操作时不会被阻塞。
  • 基于缓冲区的批量数据操作:NIO 使用缓冲区来处理数据,数据可以批量地读入缓冲区或从缓冲区写出。

2、资源利用率方面

  • 单线程管理多通道:通过选择器(Selector),一个线程可以同时管理多个通道(Channel)的 IO 事件。
  • 减少内存复制:在 NIO 中,直接缓冲区(Direct Buffer)可以让数据直接在操作系统的物理内存和应用程序之间进行传输,而不需要在 Java 堆内存和操作系统内存之间进行多次复制。

3.3、NIO在其他软件上的应用

1、为什么Redis单线程也能够支持非常高的并发

首先Redis官方没有Windows版本,只有linux版本。

Redis底层采用的Nio中的多路IO复用机制,能够非常好的支持这样的并发,从而保证线程安全问题。

多路IO复用:使用一个线程维护多个不同的IO操作,原理是使用NIO的选择器,将多个不同的Channel统一交给选择器管理。

2、NIO在不同的操作系统上实现方式

在Windows上使用select实现轮询,事件复杂度是O(n),而且还存在空轮询的情况,效率非常低,其次默认对我们轮询的数据有一定限制,所以支持上万的TCP连接是非常难的。

在Linux操作系统上采用epoll实现事件驱动回调,不会存在空轮询的情况,只对活跃的socket连接实现主动回调,这样在性能上大大的提高,所以时间复杂度为O(1)

Windows操作系统上是没有epoll的。

所以为什么nginx、redis能够支持非常高的并发,就是因为liunx中IO多路复用机制:epoll

3.4、NIO示例

下面是一个完整的 Java NIO 示例,展示了如何使用 Selector 和 Buffer 实现一个简单的 Echo 服务器。这个服务器会监听客户端连接,读取客户端发送的数据,并将数据原样返回给客户端。

package myTest.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class NIOEchoServer {

    private static final Map<SocketChannel, StringBuilder> clientData = new HashMap<>();

    public static void main(String[] args) throws IOException {
        // 1. 创建Selector
        Selector selector = Selector.open();

        // 2. 创建ServerSocketChannel并绑定端口
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式

        // 3. 将ServerSocketChannel注册到Selector,监听ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Server started on port 8080...");

        while (true) {
            // 4. 阻塞等待就绪的事件
            selector.select();

            // 5. 获取就绪的事件集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    // 6. 处理ACCEPT事件:接受客户端连接
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false); // 设置为非阻塞模式

                    // 7. 将客户端通道注册到Selector,监听READ事件
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    clientData.put(clientChannel, new StringBuilder()); // 初始化客户端数据缓冲区
                    System.out.println("Client connected: " + clientChannel.getRemoteAddress());
                } else if (key.isReadable()) {
                    // 8. 处理READ事件:读取客户端数据
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024); // 创建Buffer

                    int bytesRead = clientChannel.read(buffer);
                    if (bytesRead > 0) {
                        buffer.flip(); // 切换为读模式
                        byte[] data = new byte[buffer.remaining()];
                        buffer.get(data); // 将数据从Buffer读取到字节数组
                        String message = new String(data);

                        // 9. 将数据追加到客户端的数据缓冲区
                        StringBuilder clientBuffer = clientData.get(clientChannel);
                        clientBuffer.append(message);

                        // 10. 检查是否收到完整的一段内容(以换行符为结束标志)
                        if (clientBuffer.toString().contains("\n")) {
                            String fullMessage = clientBuffer.toString().trim();
                            System.out.println("Received full message from client: " + fullMessage);

                            // 11. 将完整的内容写回客户端
                            ByteBuffer responseBuffer = ByteBuffer.wrap(("Echo: " + fullMessage + "\n").getBytes());
                            clientChannel.write(responseBuffer);

                            // 12. 清空客户端的数据缓冲区
                            clientBuffer.setLength(0);
                        }
                    } else if (bytesRead < 0) {
                        // 客户端关闭连接
                        System.out.println("Client disconnected: " + clientChannel.getRemoteAddress());
                        clientData.remove(clientChannel); // 移除客户端数据缓冲区
                        clientChannel.close();
                    }
                }
                // 13. 移除已处理的事件
                keyIterator.remove();
            }
        }
    }
}

运行 NIOEchoServer,服务器会监听 8080 端口。

可以使用 telnet 或 nc 工具连接服务器:telnet localhost 8080

 

全部评论