原创

Java IO


一、IO模型

简单理解:用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。

Java功支持三种IO模型:BIO,NIO,AIO

Java BIO:同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。适用于连接数目较小且固定的架构。

Java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器(Selector)上,多路复用器轮询到连接有IO请求就进行处理。适用于连接数目多且连接比较短的架构。

Java AIO:异步非阻塞,AIO引入异步通道的 概念,采用了Proactor模式,简化了程序的编写,有效的请求才启动线程,特点是先由操作系统完成后才通知服务端启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

1、BIO

编程简单流程:

  1. 服务器端启动一个ServerSocket
  2. 客户端启动Socket对服务器进行通信,默认情况下服务器端需要对每个客户端建立一个线程与之通信
  3. 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
  4. 如果有响应,客户端线程会等待请求结束后,再继续执行

案例:使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时就启动一个线程与之通信。

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/
 * BIO示例
 * 线程池机制
 * 思路:
 * 1.创建一个线程池
 * 2.有客户端连接,就创建一个线程,与之通信
 *
 * @author lzlg
 * 2020/8/6 8:32
 */
public class BIODemo {

    private static final ExecutorService pool = Executors.newCachedThreadPool();

    private static class BIOServer {

        private int port;

        private BIOServer(int port) {
            this.port = port;
            start();
        }

        public static BIOServer build(int port) {
            return new BIOServer(port);
        }

        private void start() {
            try (ServerSocket serverSocket = new ServerSocket(this.port)) {
                System.out.println("服务器启动了");
                while (true) {
                    printThreadInfo();

                    System.out.println("等待连接.....");
                    final Socket socket = serverSocket.accept();
                    System.out.println("连接到一个客户端==" + socket.toString());
                    pool.execute(() -> {

                        handler(socket);
                    });
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void handler(Socket socket) {
            printThreadInfo();
            try (InputStream is = socket.getInputStream()) {
                int read;
                byte[] bytes = new byte[1024];
                while ((read = is.read(bytes)) != -1) {
                    printThreadInfo();
                    System.out.println(new String(bytes, 0, read));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /
     * 打印当前线程的相关信息
     */
    private static void printThreadInfo() {
        System.out.println("当前线程id= " + Thread.currentThread().getId() +
                ", 当前线程名称=" + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        BIOServer.build(6666);
    }
}

问题分析:

  1. 每个请求都需要创建独立的线程,与对应的客户端进行数据read,业务处理完成后,数据write
  2. 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在read操作上,造成线程资源浪费

2、NIO

三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)

  1. 每个Selector都会对应一个Buffer
  2. 一个Selector对应一个线程,一个线程对应多个Channel(连接)
  3. 可有多个Channel注册到Selector上
  4. 程序切换Channel是由事件Event决定的
  5. Selector会根据不同的事件,在各个Channel(通道)上切换
  6. Buffer是一个内存块,底层是一个数组
  7. 数据的读取写入是通过Buffer,Buffer可读可写,需要flip方法进行切换
  8. Channel是双向的,可返回底层操作系统的情况

NIO是面向缓冲区,或者面向块编程的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。

NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但仅能得到目前可用的数据,如果当前没有数据可读,而不阻塞,可以继续做其他事情,直到有数据可读。非阻塞写也是如此。NIO可以做到一个线程处理多个操作。

二、NIO核心

1、Buffer(缓冲区)

简单介绍:

Buffer本质上是一个可读写数据的内存块,可理解成一个容器对象(包含数组),Buffer对象提供了一组轻松使用该内存块的方法,且内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的通道,但是读取和写入的数据必须经由Buffer。

Buffer类和其子类:

Buffer对象是个顶层父类,是个抽象类,其子类有:ByteBuffer,ShortBuffer,CharBuffer,IntBuffer,LongBuffer,DoubleBuffer,FloatBuffer。

四个属性:

// Invariants: mark <= position <= limit <= capacity
private int mark = -1; // 用于标记,可在读写过程中设置,可返回上次的标记位
private int position = 0; // 位置,下一个要被读或写的元素的索引,每次读写都会改变该值
private int limit; // 表示缓冲区的当前终点,不能对缓冲区超过limit的位置进行读写操作,可修改
private int capacity; // 容量,可容纳的最大数据量,缓冲区创建时被设定且不能被改变

简单的Buffer示例:

private static class BasicBuffer {
        public static void main(String[] args) {
            // 创建一个Buffer,大小为5,可存放5个int数据
            IntBuffer intBuffer = IntBuffer.allocate(5);
            // 存放数据
            for (int i = 0; i < intBuffer.capacity(); i++) {
                intBuffer.put(i * 2);
            }
            // 将Buffer转换,重置属性,使之可以进行读取操作
            /
             *     public final Buffer flip() {
             *         limit = position;
             *         position = 0;
             *         mark = -1;
             *         return this;
             *     }
             */
            intBuffer.flip();
            intBuffer.position(1);// 从索引是1的位置开始读取
            intBuffer.limit(3);// 对读取的数据进行限制,不能超过3
            // 循环读取,并输出
            while (intBuffer.hasRemaining()) {
                System.out.println(intBuffer.get());
            }
        }
    }

常用方法说明:

public abstract class Buffer {
    // JDK1.4时的API
    public final int capacity(); // 返回缓冲区的容量
    public final int position(); // 返回缓冲区的位置
    public final Buffer position(int newPosition); // 设置缓冲区的位置
    public final int limit(); // 返回缓冲区的限制
    public final Buffer limit(int newLimit); // 设置缓冲区的限制
    public final Buffer mark(); // 在当前的位置设置标记
    public final Buffer reset(); // 将当前的位置重置为以前标记的位置
    public final Buffer clear(); // 清除缓冲区,将各个标记恢复到初始状态,数据没有真正的擦除
    public final Buffer flip(); // 反转此缓冲区,读写切换
    public final Buffer rewind(); // 重绕此缓冲区
    public final int remaining(); // 返回当前位置和限制之间的元素数量
    public final boolean hasRemaining(); // 在当前位置和限制之间是否有元素
    public abstract boolean isReadOnly(); // 此缓冲区是否是只读缓冲区
    
    // JDK1.6后的API
    public abstract boolean hasArray(); // 缓冲区是否具有可访问的底层实现数组
    public abstract Object array(); // 返回缓冲区的底层实现数组
    public abstract int arrayOffset(); // 返回缓冲区的底层实现数组中第一个元素的偏移量
    public abstract boolean isDirect(); // 此缓冲区是否是直接缓冲区
}

// 子类ByteBuffer
public abstract class ByteBuffer {
    // 缓冲区相关的API
    public static ByteBuffer allocateDirect(int capacity); // 创建直接缓冲区
    public static ByteBuffer allocate(int capacity); // 设置缓冲区的初始容量
    public static ByteBuffer wrap(byte[] array); // 把一个数组放到缓冲区使用
    public static ByteBuffer wrap(byte[] array, int offset, int length); // 构造初始化位置为offset和上界length的缓冲区
    
    // 缓冲区存取相关API
    public abstract byte get(); // 从当前位置position上get,get之后,position自动+1
    public abstract byte get(int index); // 从绝对位置get
    public abstract ByteBuffer put(byte b); // 从当前位置position上put,put之后,position自动+1
    public abstract ByteBuffer put(int index, byte b); // 从绝对位置put
}

注意事项:

import java.nio.ByteBuffer;

/
 * ByteBuffer 类型化 Put Get
 *
 * @author lzlg
 * 2020/8/8 19:23
 */
public class NIOByteBufferPutGet {
    public static void main(String[] args) {
        // Put时候放入什么数据, Get时候就使用对应的类型获取
        // 否则会有 java.nio.BufferUnderflowException 异常抛出
        ByteBuffer buffer = ByteBuffer.allocate(64);
        buffer.putInt(100);
        buffer.putLong(18L);
        buffer.putChar('&');
        buffer.putShort((short) 8);

        buffer.flip();

        System.out.println(buffer.getInt());
        System.out.println(buffer.getLong());
        System.out.println(buffer.getLong()); // 错误
        System.out.println(buffer.getShort());

        // 可转换成只读的ByteBuffer
        ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
        // 如果再放入则抛出 java.nio.ReadOnlyBufferException 异常
        readOnlyBuffer.putChar('h'); // 错误
    }
}

MappedByteBuffer:

可让文件直接在内存(堆外内存)中进行修改,操作系统无需拷贝一次,同步文件由NIO来完成。

import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

/
 * MappedByteBuffer
 * 可让文件直接在内存(堆外内存)中进行修改,操作系统无需拷贝一次,同步文件由NIO来完成。
 *
 * @author lzlg
 * 2020/8/8 19:35
 */
public class NIOMappedByteBufferDemo {
    public static void main(String[] args) {
        // 获取文件信息
        try (RandomAccessFile randomAccessFile = new RandomAccessFile("test01.txt", "rw")) {
            // 获取通道
            FileChannel fileChannel = randomAccessFile.getChannel();
            /
             * 参数说明:MapMode mode, long position, long size
             * 1.mode 映射模式, 当前是读写类型
             * 2.position 可以在内存中直接修改的起始位置
             * 3.size 映射到内存的大小,即将文件test01.txt的多少个字节映射到内存中
             *
             * 下面创建的对象说明: 可直接修改的起始位置从1开始, 可修改的字节数为5个
             */
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 1, 5);
            mappedByteBuffer.put(0, (byte) 'N');
            mappedByteBuffer.put(3, (byte) '6');
            mappedByteBuffer.put(5, (byte) 'P');// 错误,size为5,只能在下标为[0~4]之间修改
            // 会抛出 java.lang.IndexOutOfBoundsException 异常
            System.out.println("修改成功.....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、Channel(通道)

简单介绍:

Channel在NIO中是一个接口,常用的Channel类:FileChannel(文件数据的读写),DatagramChannel(用于UDP的数据读写),ServerSocketChannel和SocketChannel(用于TCP的数据读写)。

和流的区别:

类似于流,但有区别:1.通道可同时进行读写,2.通道可实现异步读写数据,3.通道可以从缓冲区读取数据,也可写数据到缓冲区中。注意:流对象包含通道的对象。

FileChannel类:

常用方法:注意在通道对象中,数据从通道到缓冲区是读操作,数据从缓冲区到通道是写操作。

  1. public int read(ByteBuffer dst); 从通道读取数据并放到缓冲区中
  2. public int write(ByteBuffer src); 把缓冲区的数据写入到通道中
  3. public long transferFrom(ReadableByteChannel src, long position, long count); 把目标通道src中复制数据到当前通道中
  4. public long transferTo(long position, long count, WritableByteChannel target); 把当前通道的数据复制到目标通道target中

FileChannel示例程序:

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;

/
 * NIO File Channel示例
 *
 * @author lzlg
 * 2020/8/8 18:42
 */
public class NIOFileChannelDemo {

    public static void main(String[] args) {
//        write();

//        read();

//        copy();

        transfer();
    }

    /
     * 使用FileChannel写数据
     */
    private static void write() {
        String str = "Hello,World,环境是客户付款就";
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        // 创建一个输出流
        try (FileOutputStream fos = new FileOutputStream("E:\\test\\file01.txt")) {
            // 从流中获取通道
            FileChannel fileChannel = fos.getChannel();
            // 创建ByteBuffer
            ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
            // 将数据写入缓冲区
            byteBuffer.put(bytes);
            // flip切换
            byteBuffer.flip();
            // 把ByteBuffer中数据写入到Channel中
            fileChannel.write(byteBuffer);

            fileChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /
     * 使用FileChannel读取数据
     */
    private static void read() {
        // 创建一个输入流
        try (FileInputStream fis = new FileInputStream("E:\\test\\file01.txt")) {
            // 从流中获取通道
            FileChannel fileChannel = fis.getChannel();
            // 创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(fis.available());
            // 将通道中的数据读取到ByteBuffer中
            fileChannel.read(byteBuffer);
            // 打印ByteBuffer中的数据
            System.out.println(new String(byteBuffer.array(), StandardCharsets.UTF_8));

            fileChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /
     * 使用FileChannel拷贝文件
     */
    private static void copy() {
        // 创建输入和输出流
        try (FileInputStream fis = new FileInputStream("test01.txt");
             FileOutputStream fos = new FileOutputStream("test02.txt")) {
            // 获取流对应的通道
            FileChannel fisChannel = fis.getChannel();
            FileChannel fosChannel = fos.getChannel();
            // 创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(8);
            // 把输入流通道中的数据读取到缓冲区中
            while (fisChannel.read(byteBuffer) != 0) {
                // ByteBuffer切换
                byteBuffer.flip();
                // 将缓冲区的数据写入到输出流的通道中
                fosChannel.write(byteBuffer);
                // !!! 重要,写入完成后,需倒带缓冲区,
                // 否则limit和position相等,下次读取一直是0,进入死循环
                byteBuffer.rewind();
            }
            fisChannel.close();
            fosChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /
     * 使用FileChannel传输文件
     */
    private static void transfer() {
        // 创建输入和输出流
        try (FileInputStream fis = new FileInputStream("E:\\test\\activity-1.0.0.jar");
             FileOutputStream fos = new FileOutputStream("activity-1.0.0.jar")) {
            // 获取流对应的通道
            FileChannel srcChannel = fis.getChannel();
            FileChannel dstChannel = fos.getChannel();
            // 直接使用transferFrom方法
            dstChannel.transferFrom(srcChannel, 0, srcChannel.size());

            srcChannel.close();
            dstChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Scattering和Gathering:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;

/
 * Scattering: 将数据写入到Buffer中时可以使用Buffer数组,依次写入
 * Gathering: 从Buffer中读取数据时,可以使用Buffer数组,依次读取
 *
 * @author lzlg
 * 2020/8/8 20:06
 */
public class NIOScatteringGatheringDemo {
    public static void main(String[] args) {
        // 创建 ServerSocketChannel
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            // 进行端口的绑定
            InetSocketAddress inetSocketAddress = new InetSocketAddress(7888);
            serverSocketChannel.socket().bind(inetSocketAddress);

            // 创建ByteBuffer数组
            ByteBuffer[] buffers = new ByteBuffer[2];
            buffers[0] = ByteBuffer.allocate(5);
            buffers[1] = ByteBuffer.allocate(3);

            // 接收客户端的请求
            SocketChannel socketChannel = serverSocketChannel.accept();
            int limit = 8;

            while (true) {
                // 读取数据到Buffer数组中
                long read = 0;
                while (read < limit) {
                    long r = socketChannel.read(buffers);
                    read += r;
                    System.out.println("byte read count: " + read);
                    // 打印Buffer的情况
                    Arrays.stream(buffers).forEach(buffer -> System.out.println("position: " +
                            buffer.position() + ", limit: " + buffer.limit()));
                }

                // 将所有的Buffer进行flip
                Arrays.asList(buffers).forEach(ByteBuffer::flip);

                // 将所有的数据写出到客户端
                long write = 0;
                while (write < limit) {
                    long w = socketChannel.write(buffers);
                    write += w;
                }

                // 将所有的Buffer进行clear
                Arrays.asList(buffers).forEach(ByteBuffer::clear);

                System.out.println("byte read: " + read + ", byte write: " + write
                        + ", data limit: " + limit);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

3、Selector(选择器)

简单介绍:

Selector能够检测多个注册的通道上是否有事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理,这样就可以只用一个单线程去管理多个通道。

只有在连接通道真正有读写事件发生时,才会进行读写,大大减少系统开销,不必为每个连接创建线程,同时避免了多线程之间上下文切换导致的开销。

特点:

  1. Netty的IO线程NioEventLoop 聚合了Selector(选择器,也叫多路复用器),可同时并发处理成百上千个客户端连接。
  2. 当线程从客户端Socket通道进行读写数据时,若没有数据可以时,该线程可执行其他任务。
  3. 线程通常将非阻塞IO的空闲时间用于其他通道上执行IO操作,单独的线程可管理多个输入和输出通道。
  4. 读写操作都是非阻塞的,充分提升IO线程的运行效率,避免IO阻塞造成的线程挂起。
  5. 一个IO线程可并发处理N个客户端连接和读写操作,架构的性能,弹性伸缩能力和可靠能力都得到提升。

Selector类和相关方法:

Selector是个抽象类,常用的方法:

public abstract class Selector implements Closeable {
    public static Selector open(); // 得到一个Selector对象
    public int select(long timeout); // 监控所有注册的通道,当其中有相关IO操作时,将对应的SelectionKey加入到内部集合中,timeout超时时间
    public int select(); // 阻塞的select操作
    public int selectNow(); // 非阻塞的select操作
    public abstract Selector wakeup(); // 唤醒正在阻塞的Selector 
    public Set<SelectionKey> selectedKeys(); // 返回当前的SelectionKey集合
}

4、NIO网络编程原理

NIO网络编程相关类(Selector,SelectionKey,ServerSocketChannel,SocketChannel)关系梳理:

  1. ServerSocketChannel监听端口,当有客户端连接时生成对应的SocketChannel;ServerSocketChannel也要注册进Selector中,才能监听到客户端连接事件。
  2. Selector负责监听,调用select() 方法,返回有事件发生的通道的个数。
  3. 将SocketChannel注册到Selector上,注册方法register(Selector sel, int ops, Object att),并返回SelecionKey,一个Selector上可注册多个SocketChannel。
  4. 有注册的事件发生时,调用方法selectedKeys() 可获取当前SelecionKey集合。
  5. 通过SelecionKey的channel() 方法可反向获取 SocketChannel,然后完成业务的处理
  6. 事件类型:int OP_READ = 1 << 0; int OP_WRITE = 1 << 2; int OP_CONNECT = 1 << 3; int OP_ACCEPT = 1 << 4;

代码示例:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/
 * NIO编程示例
 *
 * @author lzlg
 * 2020/8/9 11:41
 */
public class NIOServerClientDemo {
    /
     * NIO服务端
     */
    private static class NIOServer {
        public static void main(String[] args) {
            // 创建一个服务端的 ServerSocketChannel
            try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
                // 绑定本地端口8888
                serverSocketChannel.socket().bind(new InetSocketAddress(8888));
                // 设置通道为非阻塞
                serverSocketChannel.configureBlocking(false);
                // 创建一个选择器
                Selector selector = Selector.open();
                // 将 ServerSocketChannel 注册进Selector中, 注册事件:接收连接 OP_ACCEPT
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

                while (true) {
                    // 每隔 1 秒检测是否有连接的事件发生
                    if (selector.select(1000) == 0) {
                        System.out.println("没有客户端连接, 等待 1 秒.....");
                        continue;
                    }
                    // 如果有连接事件发生, 则获取相关的 SelectionKey集合
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    // 使用迭代器遍历 SelectionKey集合
                    Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey selectionKey = keyIterator.next();
                        // 如果是可接收状态
                        if (selectionKey.isAcceptable()) {
                            // 则ServerSocketChannel接收客户端请求
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            // 注意要设置为非阻塞
                            socketChannel.configureBlocking(false);
                            // 并将客户端请求获取到的 SocketChannel 注册入 Selector中
                            socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(512));
                        }
                        // 如果当前有SelectionKey是可读的
                        if (selectionKey.isReadable()) {
                            // 从 selectionKey 中获取 SocketChannel
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            // 从 selectionKey 中获取 ByteBuffer
                            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                            // 将通道中的数据读取到Buffer中
                            channel.read(buffer);
                            System.out.println("从客户端获取到消息: " + new String(buffer.array()));
                        }

                        // 将迭代后的元素, 进行删除,
                        // 否则一个客户端连接后, serverSocketChannel的事件改变, 造成空指针
                        keyIterator.remove();
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /
     * NIO客户端
     */
    private static class NIOClient {
        public static void main(String[] args) {
            // 创建一个Socket通道
            try (SocketChannel socketChannel = SocketChannel.open()) {
                // 设置通道为非阻塞
                socketChannel.configureBlocking(false);
                // 设置服务器地址
                InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8888);
                // 如果没有连接上, 则进行其他操作
                if (!socketChannel.connect(inetSocketAddress)) {
                    while (!socketChannel.finishConnect()) {
                        System.out.println("连接需要时间, 客户端不会阻塞, 可以做其他工作.....");
                    }
                }
                // 创建ByteBuffer
                String str = "Hello,李哲龙,张玉蒙";
                ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
                // 将ByteBuffer的数据读取到Channel中
                socketChannel.write(buffer);

                // 阻塞客户端
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

5、其他对象

SelectionKey:表示Selector和通道的注册关系,共四种:

  1. int OP_READ = 1 << 0; 读操作,值为 1
  2. int OP_WRITE = 1 << 2; 写操作,值为 4
  3. int OP_CONNECT = 1 << 3; 连接已经建立,值为 8
  4. int OP_ACCEPT = 1 << 4; 有新的网络连接可以 Accept,值为 16

selector方法中keys() 方法返回所有注册的SelectionKey;selectedKeys()方法返回的是发生事件的SelectionKey。

相关的API:

public abstract class SelectionKey {
    public abstract Selector selector(); // 得到与之关联的 Selector
    public abstract SelectableChannel channel(); // 得到与之关联的 Channel
    public final Object attachment(); // 得到与之关联的共享数据
    public abstract SelectionKey interestOps(int ops); // 设置或改变监听事件
    public final boolean isAcceptable(); // 是否可以 accept
    public final boolean isReadable(); // 是否可读
    public final boolean isWritable(); // 是否可写
}

SeverSocketChannel:用于服务器监听新的客户端Socket连接

相关的API:

public abstract class SeverSocketChannel extends AbstractSelectableChannel
    implements NetworkChannel {
    public static SeverSocketChannel open(); // 得到一个SeverSocketChannel
    public final SeverSocketChannel bind(SocketAddress local); // 设置服务器端口号
    public final SelectableChannel configureBlocking(boolean block); // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
    public SocketChannel accept(); // 接收一个连接,返回代表这个连接的 SocketChannel
    public final SelectionKey register(Selector sel, int ops); // 注册一个选择器,并设置监听事件
}

SocketChannel:网络IO通道,具体负责进行读写操作,NIO把缓冲区的数据写入通道,或把通道中的数据读取到缓冲区中。

相关的API:

public abstract class public abstract class SocketChannel
    extends AbstractSelectableChannel
    implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {
    public static SocketChannel open(); // 得到一个SocketChannel
    public final SelectableChannel configureBlocking(boolean block); // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
    public boolean connect(SocketAddress remote); // 连接服务器
    public boolean finishConnect(); // 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
    public int write(ByteBuffer src); // 往通道中写数据
    public int read(ByteBuffer src); // 从通道中读数据
    public final SelectionKey register(Selector sel, int ops, Object att); // 注册一个选择器,并设置监听事件,最后一个参数可设置共享数据 
    public final void close(); // 关闭通道
}

6、群聊系统案例

服务端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/
 * 群聊系统服务端
 *
 * @author lzlg
 * 2020/8/15 18:47
 */
public class GroupChatServer {

    private Selector selector; // 选择器

    private ServerSocketChannel channel; // 服务器Socket通道

    private static final int PORT = 8675; // 服务器端口号

    /
     * 提供外部可访问的静态构造方法
     */
    public static GroupChatServer build() {
        return new GroupChatServer();
    }

    /
     * 构造器
     */
    private GroupChatServer() {
        try {
            // 创建选择器
            selector = Selector.open();
            // 创建 服务器Socket通道
            channel = ServerSocketChannel.open();
            // 绑定端口
            channel.socket().bind(new InetSocketAddress(PORT));
            // 设置非阻塞
            channel.configureBlocking(false);

            // 注册为Accept事件
            channel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Server is start.....");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /
     * 服务器启动
     * 1.监听客户端发送的消息
     * 2.将客户端发送的消息转发到其他客户端
     */
    public void run() {
        try {
            while (true) {
                // 监听选择器中发生的事件
                int select = selector.select();
                if (select > 0) {
                    // 获取所有的发生事件的SelectionKey
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        // 迭代 SelectionKey
                        SelectionKey key = it.next();
                        // 如果是接收事件,则打开客户端
                        if (key.isAcceptable()) {
                            // 获取监听客户端的Channel
                            SocketChannel channel = this.channel.accept();
                            // 设置为非阻塞,并注册为read事件
                            channel.configureBlocking(false);
                            channel.register(selector, SelectionKey.OP_READ);
                            // 标记客户端上线
                            System.out.println(channel.getRemoteAddress() + " 上线了.....");
                        }
                        // 如果是读取事件,则读取消息
                        if (key.isReadable()) {
                            // 获取到消息内容
                            String msg = read(key);

                            // 把消息转发到其他通道上
                            forward(msg, key);
                        }

                        // 注意这里要清除当前循环的key
                        it.remove();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    /
     * 读取消息
     *
     * @param key 有事件发生的通道
     */
    private String read(SelectionKey key) {
        SocketChannel channel = null;
        try {
            // 获取对应的通道
            channel = (SocketChannel) key.channel();
            // 使用8字节的buffer进行读取
            int read;
            ByteBuffer buffer = ByteBuffer.allocate(8);
            StringBuilder sb = new StringBuilder();
            while ((read = channel.read(buffer)) != 0) {
                sb.append(new String(buffer.array(), 0, read));
                // 这里必须进行rewind(),否则Buffer不能再次使用
                buffer.rewind();
            }
            // 获取到消息内容
            String msg = sb.toString();
            System.out.println("收到消息: " + msg);
            return msg;
        } catch (IOException e) {
            try {
                // client离线
                System.out.println(channel.getRemoteAddress() + " 离线了.....");
                // 取消注册
                key.cancel();
                // 关闭通道
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            return null;
        }
    }

    /
     * 将消息转发到其他通道上
     *
     * @param msg        消息内容
     * @param excludeKey 排除的通道
     */
    private void forward(String msg, SelectionKey excludeKey) throws IOException {
        if (msg == null) return;
        // 获取所有的通道
        Set<SelectionKey> keys = selector.keys();
        // 遍历
        for (SelectionKey key : keys) {
            SelectableChannel channel = key.channel();
            // 如果是SocketChannel, 且不为排除的通道
            if (channel instanceof SocketChannel && channel != excludeKey.channel()) {
                // 进行数据的写入
                SocketChannel socketChannel = (SocketChannel) channel;
                // 把Buffer中的数据写入通道中
                socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
            }
        }
    }

    public static void main(String[] args) {
        GroupChatServer.build().run();
    }
}

客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/
 * 群聊系统客户端
 *
 * @author lzlg
 * 2020/8/15 18:47
 */
public class GroupChatClient {

    private Selector selector; // 选择器

    private SocketChannel channel; // 通道

    private String name;

    private static final String SERVER_IP = "127.0.0.1";

    private static final int SERVER_PORT = 8675;

    private static final ExecutorService pool = Executors.newFixedThreadPool(4);

    /
     * 暴露给外部的构建方法
     */
    public static GroupChatClient build() {
        return new GroupChatClient();
    }

    private GroupChatClient() {
        try {
            // 创建选择器
            selector = Selector.open();
            // 创建Socket通道 通道绑定服务端ip和端口
            channel = SocketChannel.open(new InetSocketAddress(SERVER_IP, SERVER_PORT));
            // 设置非阻塞
            channel.configureBlocking(false);
            // 注册read事件
            channel.register(selector, SelectionKey.OP_READ);

            this.name = channel.getLocalAddress().toString().substring(1);
            System.out.println(this.name + " is ok.....");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /
     * 启动客户端
     */
    public void start() {
        // 使用线程池,执行读取消息的任务
        pool.execute(() -> {
            try {
                while (true) {
                    // 每一秒进行监听
                    int select = selector.select();
                    if (select > 0) {
                        Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                        while (it.hasNext()) {
                            SelectionKey key = it.next();
                            if (key.isReadable()) {
                                SocketChannel channel = (SocketChannel) key.channel();
                                int read;
                                StringBuilder sb = new StringBuilder();
                                ByteBuffer buffer = ByteBuffer.allocate(8);
                                while ((read = channel.read(buffer)) != 0) {
                                    sb.append(new String(buffer.array(), 0, read));
                                    buffer.rewind();
                                }
                                System.out.println("读取到消息: " + sb.toString());
                            }
                        }

                        it.remove();
                    }

                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        try {
            // 主线程发送消息
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入: ");
            while (scanner.hasNextLine()) {
                String msg = this.name + " 说: " + scanner.nextLine();
                System.out.println(msg);
                this.channel.write(ByteBuffer.wrap(msg.getBytes()));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatClient.build().start();
    }
}

7、零拷贝

在Java程序中,常用的零拷贝有 MMAP(内存映射)和 sendFile。

DMA(Direct Memory Access,直接存储器访问)

https://zhuanlan.zhihu.com/p/78869158

传统的IO需要4次上下文的切换(用户态和内核态),4次数据的Copy。

MMAP优化:

mmap通过内存映射,将文件映射到内核缓冲区,同时用户空间可共享内核空间的数据,这样减少一次内核空间到用户空间的拷贝。

sendFile优化:

Linux2.1版本提供了sendFile函数,数据不经过用户态,直接从内存缓冲区进入到SocketBuffer,减少了一次上下文切换。

Linux2.4版本进行了以下修改,避免了从内存缓冲区拷贝到SocketBuffer的操作,直接拷贝到协议栈,再次减少了一次数据拷贝。

注意:这里还是有从内存缓冲区拷贝到SocketBuffer的操作,只是拷贝的信息很少,比如文件长度,offset,消耗低,可忽略。

  1. mmap适合小数据量读写,sendFile适合大文件传输。
  2. mmap需要4次上下文切换和3次数据拷贝,sendFile需要3次上下文切换,最少2次数据拷贝。
  3. sendFile可以利用DMA方式,减少CPU拷贝,mmap则不能。

splice:

Linux 2.6.17 支持splice,数据从磁盘读取到OS内核缓冲区后,在内核缓冲区直接可将其转成内核空间其他数据buffer,而不需要拷贝到用户空间。

从磁盘读取到内核buffer后,在内核空间直接与socket buffer建立pipe管道。 和sendfile()不同的是,splice()不需要硬件支持。

注意splice和sendfile的不同,sendfile是将磁盘数据加载到kernel buffer后,需要一次CPU copy,拷贝到socket buffer。而splice是更进一步,连这个CPU copy也不需要了,直接将两个内核空间的buffer进行set up pipe。

splice会经历 2次拷贝: 0次cpu copy 2次DMA copy;以及2次上下文切换。

概念:

零拷贝是从操作系统角度,指没有CPU拷贝,因为内存缓冲区之间,没有数据是重复的。不仅带来更少的数据复制,还能带来其他的性能优势,比如更少的上下文切换,更少的CPU缓存伪共享以及无CPU校验和计算。

案例代码:

/
 * 不知道怎么分类的小工具
 *
 * @author lzlg
 * 2020/8/16 11:33
 */
public class Tool {

    private Tool() {
    }

    /
     * 计算方法运行事件
     *
     * @param runnable 方法
     */
    public static void time(Runnable runnable) {
        long start = System.currentTimeMillis();
        runnable.run();
        long end = System.currentTimeMillis();
        System.out.println("运行时间(毫秒): " + (end - start));
    }
}

import java.io.DataInputStream;
import java.net.ServerSocket;
import java.net.Socket;

/
 * 传统IO服务端
 *
 * @author lzlg
 * 2020/8/16 11:40
 */
public class OldIOServer {
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8761);
        while (true) {
            Socket socket = serverSocket.accept();
            DataInputStream dis = new DataInputStream(socket.getInputStream());
            int read;
            byte[] bytes = new byte[1024];
            while ((read = dis.read(bytes)) != -1) {
//                System.out.println("old server is read. " + read);
            }
            dis.close();
        }
    }
}

import com.lzlg.study.util.Tool;

import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;

/
 * 传统IO客户端
 *
 * @author lzlg
 * 2020/8/16 11:40
 */
public class OldIOClient {
    public static void main(String[] args) {
        Tool.time(() -> {
            try {
                Socket socket = new Socket();
                socket.connect(new InetSocketAddress("127.0.0.1", 8761));
                FileInputStream fis = new FileInputStream("activity-1.0.0.jar");
                byte[] bytes = new byte[1024];
                DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
                int read;
                while ((read = fis.read(bytes)) != -1) {
                    dos.write(bytes, 0, read);
//                    System.out.println("old client is write. " + read);
                }
                fis.close();
                dos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/
 * 非阻塞IO服务端
 *
 * @author lzlg
 * 2020/8/16 11:40
 */
public class NewIOServer {
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9876));
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int read;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while ((read = socketChannel.read(buffer)) != -1) {
                buffer.rewind();
            }
        }
    }
}

import com.lzlg.study.util.Tool;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

/
 * 非阻塞IO客户端
 *
 * @author lzlg
 * 2020/8/16 11:40
 */
public class NewIOClient {
    public static void main(String[] args) {
        Tool.time(() -> {
            try {
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.connect(new InetSocketAddress("127.0.0.1", 9876));
                FileChannel fileChannel = new FileInputStream("activity-1.0.0.jar").getChannel();
                // windows环境下零拷贝只有8M
                final long M_8 = 8 * 1024 * 1024L;
                // 文件大小
                long size = fileChannel.size();
                // 计算传输次数
                long count = size / M_8 + 1;
                for (int i = 0; i < count; i++) {
                    fileChannel.transferTo(i * M_8, M_8, socketChannel);
                }
//                fileChannel.transferTo(0, fileChannel.size(), socketChannel);
                socketChannel.close();
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

    }
}

8、AIO

JDK 7 引入了 Asynchronous I/O,即AIO。在进行 I/O 编程中,常用两种模式:Reactor模式和Proactor模式。

Java的NIO就是Reactor模式,当有事件发生时,服务端得到通知进行相应的处理。

AIO:异步不阻塞的IO,引入异步通道的概念,采用Proactor模式,有效的请求才启动线程。特点是先由操作系统完成后才通知服务端程序启动线程去处理,适用于连接数较多且连接时长较长的应用。

BIONIOAIO
IO模型同步阻塞同步非阻塞(多路复用)异步非阻塞
编程难度简单复杂复杂
可靠性
吞吐量
程序员内功
  • 作者:lzlg520
  • 发表时间:2020-08-06 13:18
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 公众号转载:请在文末添加作者公众号二维码