简单理解:用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。
Java功支持三种IO模型:BIO,NIO,AIO
Java BIO:同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。适用于连接数目较小且固定的架构。
Java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器(Selector)上,多路复用器轮询到连接有IO请求就进行处理。适用于连接数目多且连接比较短的架构。
Java AIO:异步非阻塞,AIO引入异步通道的 概念,采用了Proactor模式,简化了程序的编写,有效的请求才启动线程,特点是先由操作系统完成后才通知服务端启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
编程简单流程:
案例:使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时就启动一个线程与之通信。
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/
* BIO示例
* 线程池机制
* 思路:
* 1.创建一个线程池
* 2.有客户端连接,就创建一个线程,与之通信
*
* @author lzlg
* 2020/8/6 8:32
*/
public class BIODemo {
private static final ExecutorService pool = Executors.newCachedThreadPool();
private static class BIOServer {
private int port;
private BIOServer(int port) {
this.port = port;
start();
}
public static BIOServer build(int port) {
return new BIOServer(port);
}
private void start() {
try (ServerSocket serverSocket = new ServerSocket(this.port)) {
System.out.println("服务器启动了");
while (true) {
printThreadInfo();
System.out.println("等待连接.....");
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端==" + socket.toString());
pool.execute(() -> {
handler(socket);
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void handler(Socket socket) {
printThreadInfo();
try (InputStream is = socket.getInputStream()) {
int read;
byte[] bytes = new byte[1024];
while ((read = is.read(bytes)) != -1) {
printThreadInfo();
System.out.println(new String(bytes, 0, read));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/
* 打印当前线程的相关信息
*/
private static void printThreadInfo() {
System.out.println("当前线程id= " + Thread.currentThread().getId() +
", 当前线程名称=" + Thread.currentThread().getName());
}
public static void main(String[] args) {
BIOServer.build(6666);
}
}
问题分析:
三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)
NIO是面向缓冲区,或者面向块编程的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但仅能得到目前可用的数据,如果当前没有数据可读,而不阻塞,可以继续做其他事情,直到有数据可读。非阻塞写也是如此。NIO可以做到一个线程处理多个操作。
简单介绍:
Buffer本质上是一个可读写数据的内存块,可理解成一个容器对象(包含数组),Buffer对象提供了一组轻松使用该内存块的方法,且内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的通道,但是读取和写入的数据必须经由Buffer。
Buffer类和其子类:
Buffer对象是个顶层父类,是个抽象类,其子类有:ByteBuffer,ShortBuffer,CharBuffer,IntBuffer,LongBuffer,DoubleBuffer,FloatBuffer。
四个属性:
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; // 用于标记,可在读写过程中设置,可返回上次的标记位
private int position = 0; // 位置,下一个要被读或写的元素的索引,每次读写都会改变该值
private int limit; // 表示缓冲区的当前终点,不能对缓冲区超过limit的位置进行读写操作,可修改
private int capacity; // 容量,可容纳的最大数据量,缓冲区创建时被设定且不能被改变
简单的Buffer示例:
private static class BasicBuffer {
public static void main(String[] args) {
// 创建一个Buffer,大小为5,可存放5个int数据
IntBuffer intBuffer = IntBuffer.allocate(5);
// 存放数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
// 将Buffer转换,重置属性,使之可以进行读取操作
/
* public final Buffer flip() {
* limit = position;
* position = 0;
* mark = -1;
* return this;
* }
*/
intBuffer.flip();
intBuffer.position(1);// 从索引是1的位置开始读取
intBuffer.limit(3);// 对读取的数据进行限制,不能超过3
// 循环读取,并输出
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}
常用方法说明:
public abstract class Buffer {
// JDK1.4时的API
public final int capacity(); // 返回缓冲区的容量
public final int position(); // 返回缓冲区的位置
public final Buffer position(int newPosition); // 设置缓冲区的位置
public final int limit(); // 返回缓冲区的限制
public final Buffer limit(int newLimit); // 设置缓冲区的限制
public final Buffer mark(); // 在当前的位置设置标记
public final Buffer reset(); // 将当前的位置重置为以前标记的位置
public final Buffer clear(); // 清除缓冲区,将各个标记恢复到初始状态,数据没有真正的擦除
public final Buffer flip(); // 反转此缓冲区,读写切换
public final Buffer rewind(); // 重绕此缓冲区
public final int remaining(); // 返回当前位置和限制之间的元素数量
public final boolean hasRemaining(); // 在当前位置和限制之间是否有元素
public abstract boolean isReadOnly(); // 此缓冲区是否是只读缓冲区
// JDK1.6后的API
public abstract boolean hasArray(); // 缓冲区是否具有可访问的底层实现数组
public abstract Object array(); // 返回缓冲区的底层实现数组
public abstract int arrayOffset(); // 返回缓冲区的底层实现数组中第一个元素的偏移量
public abstract boolean isDirect(); // 此缓冲区是否是直接缓冲区
}
// 子类ByteBuffer
public abstract class ByteBuffer {
// 缓冲区相关的API
public static ByteBuffer allocateDirect(int capacity); // 创建直接缓冲区
public static ByteBuffer allocate(int capacity); // 设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array); // 把一个数组放到缓冲区使用
public static ByteBuffer wrap(byte[] array, int offset, int length); // 构造初始化位置为offset和上界length的缓冲区
// 缓冲区存取相关API
public abstract byte get(); // 从当前位置position上get,get之后,position自动+1
public abstract byte get(int index); // 从绝对位置get
public abstract ByteBuffer put(byte b); // 从当前位置position上put,put之后,position自动+1
public abstract ByteBuffer put(int index, byte b); // 从绝对位置put
}
注意事项:
import java.nio.ByteBuffer;
/
* ByteBuffer 类型化 Put Get
*
* @author lzlg
* 2020/8/8 19:23
*/
public class NIOByteBufferPutGet {
public static void main(String[] args) {
// Put时候放入什么数据, Get时候就使用对应的类型获取
// 否则会有 java.nio.BufferUnderflowException 异常抛出
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.putInt(100);
buffer.putLong(18L);
buffer.putChar('&');
buffer.putShort((short) 8);
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getLong()); // 错误
System.out.println(buffer.getShort());
// 可转换成只读的ByteBuffer
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
// 如果再放入则抛出 java.nio.ReadOnlyBufferException 异常
readOnlyBuffer.putChar('h'); // 错误
}
}
MappedByteBuffer:
可让文件直接在内存(堆外内存)中进行修改,操作系统无需拷贝一次,同步文件由NIO来完成。
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/
* MappedByteBuffer
* 可让文件直接在内存(堆外内存)中进行修改,操作系统无需拷贝一次,同步文件由NIO来完成。
*
* @author lzlg
* 2020/8/8 19:35
*/
public class NIOMappedByteBufferDemo {
public static void main(String[] args) {
// 获取文件信息
try (RandomAccessFile randomAccessFile = new RandomAccessFile("test01.txt", "rw")) {
// 获取通道
FileChannel fileChannel = randomAccessFile.getChannel();
/
* 参数说明:MapMode mode, long position, long size
* 1.mode 映射模式, 当前是读写类型
* 2.position 可以在内存中直接修改的起始位置
* 3.size 映射到内存的大小,即将文件test01.txt的多少个字节映射到内存中
*
* 下面创建的对象说明: 可直接修改的起始位置从1开始, 可修改的字节数为5个
*/
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 1, 5);
mappedByteBuffer.put(0, (byte) 'N');
mappedByteBuffer.put(3, (byte) '6');
mappedByteBuffer.put(5, (byte) 'P');// 错误,size为5,只能在下标为[0~4]之间修改
// 会抛出 java.lang.IndexOutOfBoundsException 异常
System.out.println("修改成功.....");
} catch (Exception e) {
e.printStackTrace();
}
}
}
简单介绍:
Channel在NIO中是一个接口,常用的Channel类:FileChannel(文件数据的读写),DatagramChannel(用于UDP的数据读写),ServerSocketChannel和SocketChannel(用于TCP的数据读写)。
和流的区别:
类似于流,但有区别:1.通道可同时进行读写,2.通道可实现异步读写数据,3.通道可以从缓冲区读取数据,也可写数据到缓冲区中。注意:流对象包含通道的对象。
FileChannel类:
常用方法:注意在通道对象中,数据从通道到缓冲区是读操作,数据从缓冲区到通道是写操作。
FileChannel示例程序:
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
/
* NIO File Channel示例
*
* @author lzlg
* 2020/8/8 18:42
*/
public class NIOFileChannelDemo {
public static void main(String[] args) {
// write();
// read();
// copy();
transfer();
}
/
* 使用FileChannel写数据
*/
private static void write() {
String str = "Hello,World,环境是客户付款就";
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
// 创建一个输出流
try (FileOutputStream fos = new FileOutputStream("E:\\test\\file01.txt")) {
// 从流中获取通道
FileChannel fileChannel = fos.getChannel();
// 创建ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
// 将数据写入缓冲区
byteBuffer.put(bytes);
// flip切换
byteBuffer.flip();
// 把ByteBuffer中数据写入到Channel中
fileChannel.write(byteBuffer);
fileChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/
* 使用FileChannel读取数据
*/
private static void read() {
// 创建一个输入流
try (FileInputStream fis = new FileInputStream("E:\\test\\file01.txt")) {
// 从流中获取通道
FileChannel fileChannel = fis.getChannel();
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(fis.available());
// 将通道中的数据读取到ByteBuffer中
fileChannel.read(byteBuffer);
// 打印ByteBuffer中的数据
System.out.println(new String(byteBuffer.array(), StandardCharsets.UTF_8));
fileChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/
* 使用FileChannel拷贝文件
*/
private static void copy() {
// 创建输入和输出流
try (FileInputStream fis = new FileInputStream("test01.txt");
FileOutputStream fos = new FileOutputStream("test02.txt")) {
// 获取流对应的通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
// 把输入流通道中的数据读取到缓冲区中
while (fisChannel.read(byteBuffer) != 0) {
// ByteBuffer切换
byteBuffer.flip();
// 将缓冲区的数据写入到输出流的通道中
fosChannel.write(byteBuffer);
// !!! 重要,写入完成后,需倒带缓冲区,
// 否则limit和position相等,下次读取一直是0,进入死循环
byteBuffer.rewind();
}
fisChannel.close();
fosChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/
* 使用FileChannel传输文件
*/
private static void transfer() {
// 创建输入和输出流
try (FileInputStream fis = new FileInputStream("E:\\test\\activity-1.0.0.jar");
FileOutputStream fos = new FileOutputStream("activity-1.0.0.jar")) {
// 获取流对应的通道
FileChannel srcChannel = fis.getChannel();
FileChannel dstChannel = fos.getChannel();
// 直接使用transferFrom方法
dstChannel.transferFrom(srcChannel, 0, srcChannel.size());
srcChannel.close();
dstChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Scattering和Gathering:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
/
* Scattering: 将数据写入到Buffer中时可以使用Buffer数组,依次写入
* Gathering: 从Buffer中读取数据时,可以使用Buffer数组,依次读取
*
* @author lzlg
* 2020/8/8 20:06
*/
public class NIOScatteringGatheringDemo {
public static void main(String[] args) {
// 创建 ServerSocketChannel
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 进行端口的绑定
InetSocketAddress inetSocketAddress = new InetSocketAddress(7888);
serverSocketChannel.socket().bind(inetSocketAddress);
// 创建ByteBuffer数组
ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(5);
buffers[1] = ByteBuffer.allocate(3);
// 接收客户端的请求
SocketChannel socketChannel = serverSocketChannel.accept();
int limit = 8;
while (true) {
// 读取数据到Buffer数组中
long read = 0;
while (read < limit) {
long r = socketChannel.read(buffers);
read += r;
System.out.println("byte read count: " + read);
// 打印Buffer的情况
Arrays.stream(buffers).forEach(buffer -> System.out.println("position: " +
buffer.position() + ", limit: " + buffer.limit()));
}
// 将所有的Buffer进行flip
Arrays.asList(buffers).forEach(ByteBuffer::flip);
// 将所有的数据写出到客户端
long write = 0;
while (write < limit) {
long w = socketChannel.write(buffers);
write += w;
}
// 将所有的Buffer进行clear
Arrays.asList(buffers).forEach(ByteBuffer::clear);
System.out.println("byte read: " + read + ", byte write: " + write
+ ", data limit: " + limit);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
简单介绍:
Selector能够检测多个注册的通道上是否有事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理,这样就可以只用一个单线程去管理多个通道。
只有在连接通道真正有读写事件发生时,才会进行读写,大大减少系统开销,不必为每个连接创建线程,同时避免了多线程之间上下文切换导致的开销。
特点:
Selector类和相关方法:
Selector是个抽象类,常用的方法:
public abstract class Selector implements Closeable {
public static Selector open(); // 得到一个Selector对象
public int select(long timeout); // 监控所有注册的通道,当其中有相关IO操作时,将对应的SelectionKey加入到内部集合中,timeout超时时间
public int select(); // 阻塞的select操作
public int selectNow(); // 非阻塞的select操作
public abstract Selector wakeup(); // 唤醒正在阻塞的Selector
public Set<SelectionKey> selectedKeys(); // 返回当前的SelectionKey集合
}
NIO网络编程相关类(Selector,SelectionKey,ServerSocketChannel,SocketChannel)关系梳理:
代码示例:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
/
* NIO编程示例
*
* @author lzlg
* 2020/8/9 11:41
*/
public class NIOServerClientDemo {
/
* NIO服务端
*/
private static class NIOServer {
public static void main(String[] args) {
// 创建一个服务端的 ServerSocketChannel
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 绑定本地端口8888
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
// 设置通道为非阻塞
serverSocketChannel.configureBlocking(false);
// 创建一个选择器
Selector selector = Selector.open();
// 将 ServerSocketChannel 注册进Selector中, 注册事件:接收连接 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 每隔 1 秒检测是否有连接的事件发生
if (selector.select(1000) == 0) {
System.out.println("没有客户端连接, 等待 1 秒.....");
continue;
}
// 如果有连接事件发生, 则获取相关的 SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 使用迭代器遍历 SelectionKey集合
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
// 如果是可接收状态
if (selectionKey.isAcceptable()) {
// 则ServerSocketChannel接收客户端请求
SocketChannel socketChannel = serverSocketChannel.accept();
// 注意要设置为非阻塞
socketChannel.configureBlocking(false);
// 并将客户端请求获取到的 SocketChannel 注册入 Selector中
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(512));
}
// 如果当前有SelectionKey是可读的
if (selectionKey.isReadable()) {
// 从 selectionKey 中获取 SocketChannel
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 从 selectionKey 中获取 ByteBuffer
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
// 将通道中的数据读取到Buffer中
channel.read(buffer);
System.out.println("从客户端获取到消息: " + new String(buffer.array()));
}
// 将迭代后的元素, 进行删除,
// 否则一个客户端连接后, serverSocketChannel的事件改变, 造成空指针
keyIterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/
* NIO客户端
*/
private static class NIOClient {
public static void main(String[] args) {
// 创建一个Socket通道
try (SocketChannel socketChannel = SocketChannel.open()) {
// 设置通道为非阻塞
socketChannel.configureBlocking(false);
// 设置服务器地址
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8888);
// 如果没有连接上, 则进行其他操作
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("连接需要时间, 客户端不会阻塞, 可以做其他工作.....");
}
}
// 创建ByteBuffer
String str = "Hello,李哲龙,张玉蒙";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
// 将ByteBuffer的数据读取到Channel中
socketChannel.write(buffer);
// 阻塞客户端
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
SelectionKey:表示Selector和通道的注册关系,共四种:
selector方法中keys() 方法返回所有注册的SelectionKey;selectedKeys()方法返回的是发生事件的SelectionKey。
相关的API:
public abstract class SelectionKey {
public abstract Selector selector(); // 得到与之关联的 Selector
public abstract SelectableChannel channel(); // 得到与之关联的 Channel
public final Object attachment(); // 得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops); // 设置或改变监听事件
public final boolean isAcceptable(); // 是否可以 accept
public final boolean isReadable(); // 是否可读
public final boolean isWritable(); // 是否可写
}
SeverSocketChannel:用于服务器监听新的客户端Socket连接
相关的API:
public abstract class SeverSocketChannel extends AbstractSelectableChannel
implements NetworkChannel {
public static SeverSocketChannel open(); // 得到一个SeverSocketChannel
public final SeverSocketChannel bind(SocketAddress local); // 设置服务器端口号
public final SelectableChannel configureBlocking(boolean block); // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public SocketChannel accept(); // 接收一个连接,返回代表这个连接的 SocketChannel
public final SelectionKey register(Selector sel, int ops); // 注册一个选择器,并设置监听事件
}
SocketChannel:网络IO通道,具体负责进行读写操作,NIO把缓冲区的数据写入通道,或把通道中的数据读取到缓冲区中。
相关的API:
public abstract class public abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {
public static SocketChannel open(); // 得到一个SocketChannel
public final SelectableChannel configureBlocking(boolean block); // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public boolean connect(SocketAddress remote); // 连接服务器
public boolean finishConnect(); // 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public int write(ByteBuffer src); // 往通道中写数据
public int read(ByteBuffer src); // 从通道中读数据
public final SelectionKey register(Selector sel, int ops, Object att); // 注册一个选择器,并设置监听事件,最后一个参数可设置共享数据
public final void close(); // 关闭通道
}
服务端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/
* 群聊系统服务端
*
* @author lzlg
* 2020/8/15 18:47
*/
public class GroupChatServer {
private Selector selector; // 选择器
private ServerSocketChannel channel; // 服务器Socket通道
private static final int PORT = 8675; // 服务器端口号
/
* 提供外部可访问的静态构造方法
*/
public static GroupChatServer build() {
return new GroupChatServer();
}
/
* 构造器
*/
private GroupChatServer() {
try {
// 创建选择器
selector = Selector.open();
// 创建 服务器Socket通道
channel = ServerSocketChannel.open();
// 绑定端口
channel.socket().bind(new InetSocketAddress(PORT));
// 设置非阻塞
channel.configureBlocking(false);
// 注册为Accept事件
channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server is start.....");
} catch (IOException e) {
e.printStackTrace();
}
}
/
* 服务器启动
* 1.监听客户端发送的消息
* 2.将客户端发送的消息转发到其他客户端
*/
public void run() {
try {
while (true) {
// 监听选择器中发生的事件
int select = selector.select();
if (select > 0) {
// 获取所有的发生事件的SelectionKey
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
// 迭代 SelectionKey
SelectionKey key = it.next();
// 如果是接收事件,则打开客户端
if (key.isAcceptable()) {
// 获取监听客户端的Channel
SocketChannel channel = this.channel.accept();
// 设置为非阻塞,并注册为read事件
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
// 标记客户端上线
System.out.println(channel.getRemoteAddress() + " 上线了.....");
}
// 如果是读取事件,则读取消息
if (key.isReadable()) {
// 获取到消息内容
String msg = read(key);
// 把消息转发到其他通道上
forward(msg, key);
}
// 注意这里要清除当前循环的key
it.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/
* 读取消息
*
* @param key 有事件发生的通道
*/
private String read(SelectionKey key) {
SocketChannel channel = null;
try {
// 获取对应的通道
channel = (SocketChannel) key.channel();
// 使用8字节的buffer进行读取
int read;
ByteBuffer buffer = ByteBuffer.allocate(8);
StringBuilder sb = new StringBuilder();
while ((read = channel.read(buffer)) != 0) {
sb.append(new String(buffer.array(), 0, read));
// 这里必须进行rewind(),否则Buffer不能再次使用
buffer.rewind();
}
// 获取到消息内容
String msg = sb.toString();
System.out.println("收到消息: " + msg);
return msg;
} catch (IOException e) {
try {
// client离线
System.out.println(channel.getRemoteAddress() + " 离线了.....");
// 取消注册
key.cancel();
// 关闭通道
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
return null;
}
}
/
* 将消息转发到其他通道上
*
* @param msg 消息内容
* @param excludeKey 排除的通道
*/
private void forward(String msg, SelectionKey excludeKey) throws IOException {
if (msg == null) return;
// 获取所有的通道
Set<SelectionKey> keys = selector.keys();
// 遍历
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
// 如果是SocketChannel, 且不为排除的通道
if (channel instanceof SocketChannel && channel != excludeKey.channel()) {
// 进行数据的写入
SocketChannel socketChannel = (SocketChannel) channel;
// 把Buffer中的数据写入通道中
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
}
}
}
public static void main(String[] args) {
GroupChatServer.build().run();
}
}
客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/
* 群聊系统客户端
*
* @author lzlg
* 2020/8/15 18:47
*/
public class GroupChatClient {
private Selector selector; // 选择器
private SocketChannel channel; // 通道
private String name;
private static final String SERVER_IP = "127.0.0.1";
private static final int SERVER_PORT = 8675;
private static final ExecutorService pool = Executors.newFixedThreadPool(4);
/
* 暴露给外部的构建方法
*/
public static GroupChatClient build() {
return new GroupChatClient();
}
private GroupChatClient() {
try {
// 创建选择器
selector = Selector.open();
// 创建Socket通道 通道绑定服务端ip和端口
channel = SocketChannel.open(new InetSocketAddress(SERVER_IP, SERVER_PORT));
// 设置非阻塞
channel.configureBlocking(false);
// 注册read事件
channel.register(selector, SelectionKey.OP_READ);
this.name = channel.getLocalAddress().toString().substring(1);
System.out.println(this.name + " is ok.....");
} catch (IOException e) {
e.printStackTrace();
}
}
/
* 启动客户端
*/
public void start() {
// 使用线程池,执行读取消息的任务
pool.execute(() -> {
try {
while (true) {
// 每一秒进行监听
int select = selector.select();
if (select > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
int read;
StringBuilder sb = new StringBuilder();
ByteBuffer buffer = ByteBuffer.allocate(8);
while ((read = channel.read(buffer)) != 0) {
sb.append(new String(buffer.array(), 0, read));
buffer.rewind();
}
System.out.println("读取到消息: " + sb.toString());
}
}
it.remove();
}
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
}
});
try {
// 主线程发送消息
Scanner scanner = new Scanner(System.in);
System.out.println("请输入: ");
while (scanner.hasNextLine()) {
String msg = this.name + " 说: " + scanner.nextLine();
System.out.println(msg);
this.channel.write(ByteBuffer.wrap(msg.getBytes()));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupChatClient.build().start();
}
}
在Java程序中,常用的零拷贝有 MMAP(内存映射)和 sendFile。
DMA(Direct Memory Access,直接存储器访问)
传统的IO需要4次上下文的切换(用户态和内核态),4次数据的Copy。
MMAP优化:
mmap通过内存映射,将文件映射到内核缓冲区,同时用户空间可共享内核空间的数据,这样减少一次内核空间到用户空间的拷贝。
sendFile优化:
Linux2.1版本提供了sendFile函数,数据不经过用户态,直接从内存缓冲区进入到SocketBuffer,减少了一次上下文切换。
Linux2.4版本进行了以下修改,避免了从内存缓冲区拷贝到SocketBuffer的操作,直接拷贝到协议栈,再次减少了一次数据拷贝。
注意:这里还是有从内存缓冲区拷贝到SocketBuffer的操作,只是拷贝的信息很少,比如文件长度,offset,消耗低,可忽略。
splice:
Linux 2.6.17 支持splice,数据从磁盘读取到OS内核缓冲区后,在内核缓冲区直接可将其转成内核空间其他数据buffer,而不需要拷贝到用户空间。
从磁盘读取到内核buffer后,在内核空间直接与socket buffer建立pipe管道。 和sendfile()不同的是,splice()不需要硬件支持。
注意splice和sendfile的不同,sendfile是将磁盘数据加载到kernel buffer后,需要一次CPU copy,拷贝到socket buffer。而splice是更进一步,连这个CPU copy也不需要了,直接将两个内核空间的buffer进行set up pipe。
splice会经历 2次拷贝: 0次cpu copy 2次DMA copy;以及2次上下文切换。
概念:
零拷贝是从操作系统角度,指没有CPU拷贝,因为内存缓冲区之间,没有数据是重复的。不仅带来更少的数据复制,还能带来其他的性能优势,比如更少的上下文切换,更少的CPU缓存伪共享以及无CPU校验和计算。
案例代码:
/
* 不知道怎么分类的小工具
*
* @author lzlg
* 2020/8/16 11:33
*/
public class Tool {
private Tool() {
}
/
* 计算方法运行事件
*
* @param runnable 方法
*/
public static void time(Runnable runnable) {
long start = System.currentTimeMillis();
runnable.run();
long end = System.currentTimeMillis();
System.out.println("运行时间(毫秒): " + (end - start));
}
}
import java.io.DataInputStream;
import java.net.ServerSocket;
import java.net.Socket;
/
* 传统IO服务端
*
* @author lzlg
* 2020/8/16 11:40
*/
public class OldIOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8761);
while (true) {
Socket socket = serverSocket.accept();
DataInputStream dis = new DataInputStream(socket.getInputStream());
int read;
byte[] bytes = new byte[1024];
while ((read = dis.read(bytes)) != -1) {
// System.out.println("old server is read. " + read);
}
dis.close();
}
}
}
import com.lzlg.study.util.Tool;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
/
* 传统IO客户端
*
* @author lzlg
* 2020/8/16 11:40
*/
public class OldIOClient {
public static void main(String[] args) {
Tool.time(() -> {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", 8761));
FileInputStream fis = new FileInputStream("activity-1.0.0.jar");
byte[] bytes = new byte[1024];
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
int read;
while ((read = fis.read(bytes)) != -1) {
dos.write(bytes, 0, read);
// System.out.println("old client is write. " + read);
}
fis.close();
dos.close();
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/
* 非阻塞IO服务端
*
* @author lzlg
* 2020/8/16 11:40
*/
public class NewIOServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9876));
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int read;
ByteBuffer buffer = ByteBuffer.allocate(1024);
while ((read = socketChannel.read(buffer)) != -1) {
buffer.rewind();
}
}
}
}
import com.lzlg.study.util.Tool;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
/
* 非阻塞IO客户端
*
* @author lzlg
* 2020/8/16 11:40
*/
public class NewIOClient {
public static void main(String[] args) {
Tool.time(() -> {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9876));
FileChannel fileChannel = new FileInputStream("activity-1.0.0.jar").getChannel();
// windows环境下零拷贝只有8M
final long M_8 = 8 * 1024 * 1024L;
// 文件大小
long size = fileChannel.size();
// 计算传输次数
long count = size / M_8 + 1;
for (int i = 0; i < count; i++) {
fileChannel.transferTo(i * M_8, M_8, socketChannel);
}
// fileChannel.transferTo(0, fileChannel.size(), socketChannel);
socketChannel.close();
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
JDK 7 引入了 Asynchronous I/O,即AIO。在进行 I/O 编程中,常用两种模式:Reactor模式和Proactor模式。
Java的NIO就是Reactor模式,当有事件发生时,服务端得到通知进行相应的处理。
AIO:异步不阻塞的IO,引入异步通道的概念,采用Proactor模式,有效的请求才启动线程。特点是先由操作系统完成后才通知服务端程序启动线程去处理,适用于连接数较多且连接时长较长的应用。
BIO | NIO | AIO | |
---|---|---|---|
IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
编程难度 | 简单 | 复杂 | 复杂 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |