NIO与BIO

Stream与Channel

stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用二者均为全双工,即读写可以同时进行

虽然Stream是单向流动的,但是它也是全双工的

IO模型

同步:线程自己去获取结果(一个线程)

例如:线程调用一个方法后,需要等待方法返回结果 异步:线程自己不去获取结果,而是由其它线程返回结果(至少两个线程)

例如:线程A调用一个方法后,继续向下运行,运行结果由线程B返回

当调用一次 channel.read 或 stream.read 后,会由用户态切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

等待数据阶段复制数据阶段

根据UNIX 网络编程 - 卷 I,IO模型主要有以下几种

阻塞IO

用户线程进行read操作时,需要等待操作系统执行实际的read操作,此期间用户线程是被阻塞的,无法执行其他操作

非阻塞IO

用户线程 在一个循环中一直调用read方法,若内核空间中还没有数据可读,立即返回,只是在等待阶段非阻塞 用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果

多路复用

Java中通过Selector实现多路复用

当没有事件时,调用select方法会被阻塞住一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用

多路复用与阻塞IO的区别

阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后,才能去处理read事件多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行

异步IO

线程1调用方法后立即返回,不会被阻塞也不需要立即获取结果当方法的运行结果出来以后,由线程2将结果返回给线程1

零拷贝

零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点

更少的用户态与内核态的切换不利用 cpu 计算,减少 cpu 缓存伪共享零拷贝适合小文件传输

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

File f = new File("helloword/data.txt");

RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];

file.read(buf);

Socket socket = ...;

socket.getOutputStream().write(buf);

内部工作流程如下

Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU

DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

用户态与内核态的切换发生了 3 次,这个操作比较重量级数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

ByteBuffer.allocate(10):底层对应 HeapByteBuffer,使用的还是 Java 内存ByteBuffer.allocateDirect(10):底层对应DirectByteBuffer,使用的是操作系统内存

大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用

这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步

DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列

当引用的对象ByteBuffer被垃圾回收以后,虚引用对象Cleaner就会被放入引用队列中,然后调用Cleaner的clean方法来释放直接内存DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法 通过专门线程访问引用队列,根据虚引用释放堆外内存 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化

以下两种方式都是零拷贝,即无需将数据拷贝到用户缓冲区中(JVM内存中)

底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

ava 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

这种方法下

只发生了1次用户态与内核态的切换数据拷贝了 3 次

进一步优化2

linux 2.4 对上述方法再次进行了优化

Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次

AIO

AIO 用来解决数据复制阶段的阻塞问题

同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

Windows 系统通过 IOCP 实现了真正的异步 IOLinux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件 AIO

public class AIOTest {

public static void main(String[] args) throws IOException {

try {

AsynchronousFileChannel async = AsynchronousFileChannel.open(Paths.get("b.txt"), StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(16);

async.read(buffer, 0, null, new CompletionHandler() {

@Override

public void completed(Integer result, ByteBuffer attachment) {

System.out.println("read finish! "+result);

buffer.flip();

ByteBufferUtil.debugAll(buffer);

}

@Override

public void failed(Throwable exc, ByteBuffer attachment) {

System.out.println("read failed!");

}

});

} catch (Exception e) {

e.printStackTrace();

}

System.out.println("do other things");

System.in.read();

}

}

do other things

read finish! 14

+--------+-------------------- all ------------------------+----------------+

position: [0], limit: [14]

+-------------------------------------------------+

| 0 1 2 3 4 5 6 7 8 9 a b c d e f |

+--------+-------------------------------------------------+----------------+

|00000000| 68 65 6c 6c 6f 77 6f 72 6c 64 31 32 33 34 00 00 |helloworld1234..|

+--------+-------------------------------------------------+----------------+

可以看到

响应文件读取成功的是另一个线程主线程并没有 IO 操作阻塞

网络AIO

public class AioServer {

public static void main(String[] args) throws IOException {

AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();

ssc.bind(new InetSocketAddress(8080));

ssc.accept(null, new AcceptHandler(ssc));

System.in.read();

}

private static void closeChannel(AsynchronousSocketChannel sc) {

try {

System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());

sc.close();

} catch (IOException e) {

e.printStackTrace();

}

}

private static class ReadHandler implements CompletionHandler {

private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {

this.sc = sc;

}

@Override

public void completed(Integer result, ByteBuffer attachment) {

try {

if (result == -1) {

closeChannel(sc);

return;

}

System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());

attachment.flip();

System.out.println(Charset.defaultCharset().decode(attachment));

attachment.clear();

// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件

sc.read(attachment, attachment, this);

} catch (IOException e) {

e.printStackTrace();

}

}

@Override

public void failed(Throwable exc, ByteBuffer attachment) {

closeChannel(sc);

exc.printStackTrace();

}

}

private static class WriteHandler implements CompletionHandler {

private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {

this.sc = sc;

}

@Override

public void completed(Integer result, ByteBuffer attachment) {

// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容

if (attachment.hasRemaining()) {

sc.write(attachment);

}

}

@Override

public void failed(Throwable exc, ByteBuffer attachment) {

exc.printStackTrace();

closeChannel(sc);

}

}

private static class AcceptHandler implements CompletionHandler {

private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {

this.ssc = ssc;

}

@Override

public void completed(AsynchronousSocketChannel sc, Object attachment) {

try {

System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());

} catch (IOException e) {

e.printStackTrace();

}

ByteBuffer buffer = ByteBuffer.allocate(16);

// 读事件由 ReadHandler 处理

sc.read(buffer, buffer, new ReadHandler(sc));

// 写事件由 WriteHandler 处理

sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));

// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件

ssc.accept(null, this);

}

@Override

public void failed(Throwable exc, Object attachment) {

exc.printStackTrace();

}

}

}

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: