目录

一.引言

何为IO

IO的过程

Java的3种网络IO模型

阻塞和非阻塞IO

IO多路复用

异步和同步IO

二.BIO

三.NIO

1. 三大组件

Channel & Buffer

Selector

2.ByteBuffer

2.1ByteBuffer的使用

2.2ByteBuffer 结构

​2.3ByteBuffer的常用方法

分配空间  

向 buffer 写入数据

从 buffer 读取数据

字符串与 ByteBuffer 互转

分散读取、集中写入

2.4调试工具类

3.文件编程

3.1FileChannel常用方法

获取

读取

写入

关闭

3.2两个Channel之间传输数据

4.网络编程

4.1ServerSocketChannel

4.2SocketChannel

4.3Selector

创建

绑定 Channel 事件

监听 Channel 事件

4.4处理 accept 事件

4.5处理 read 事件

消息边界问题

粘包问题

半包问题

如何正确处理TCP消息边界 

client意外关闭或主动关闭时

4.6处理 write 事件

一次无法保证把 buffer 中所有数据都写入 channel

4.7注意事项

 必须处理事件

水平触发和边缘触发

事件处理完后需要在 selectedKeys 集合中移除对应的SelectionKey

ByteBuffer 大小分配

select 何时不阻塞

5.多线程优化

案例

如何拿到 cpu 个数

6.NIO vs BIO

stream vs channel

四.AIO

一.引言

何为IO

涉及计算机核心(CPU和内存)与其他设备间数据迁移的过程,就是I/O。数据输入到计算机内存的过程即输入,反之输出到外部存储(比如数据库,文件,远程主机)的过程即输出。 I/O 描述了计算机系统与外部设备之间通信的过程。

磁盘I/O

输入:就是从磁盘读取数据到内存输出:将内存中的数据写入磁盘网络I/O

输入:从网络中的另一台计算机或服务器获取数据,并将其加载到本地内存中输出:将本地内存中的数据发送到网络中的其他计算机或服务器

IO的过程

根据大学里学到的操作系统相关的知识:为了保证操作系统的稳定性和安全性,一个进程的地址空间划分为 用户空间(User space) 和 内核空间(Kernel space ) 。

像我们平常运行的应用程序都是运行在用户空间,只有内核空间才能进行系统态级别的资源有关的操作,比如文件管理、进程通信、内存管理等等,因为这些都是比较危险的操作,不可以由应用程序乱来,只能交给底层操作系统来。也就是说,我们想要进行 IO 操作,只能发起系统调用请求操作系统来间接访问内核空间。

我们在平常开发过程中接触最多的就是 磁盘 IO(读写文件) 和 网络 IO(网络请求和响应)。

从应用程序的视角来看的话,我们的应用程序对操作系统的内核发起 IO 调用(系统调用),操作系统负责的内核执行具体的 IO 操作。也就是说,我们的应用程序实际上只是发起了 IO 操作的调用而已,具体 IO 的执行是由操作系统的内核来完成的。

当应用程序发起 I/O 调用后,会经历两个步骤(IO执行):

数据准备:内核等待 I/O 设备准备好数据,即操作系统将外部数据加载到内核缓冲区数据拷贝:内核将数据从内核缓冲区拷贝到用户进程缓冲区。

Java的3种网络IO模型

Java中提供的IO有关的API,也是依赖操作系统层面的IO操作实现的。在Java中,主要有三种IO模型,分别是阻塞IO(BIO)、非阻塞IO(NIO)和 异步IO(AIO)。

可以把Java中的BIO、NIO和AIO理解为是Java语言对操作系统的5种IO模型的封装(在Linux(UNIX)操作系统中,共有五种IO模型,分别是:阻塞IO模型、非阻塞IO模型、IO复用模型、信号驱动IO模型以及异步IO模型)。程序员在使用这些API的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码。只需要使用Java的API就可以了。

阻塞和非阻塞IO

上面已经说过,应用程序的IO实际是分为两个步骤,IO调用和IO执行。IO调用是由进程发起,IO执行是操作系统的工作。操作系统的IO情况决定了进程IO调用是否能够得到立即响应。

阻塞IO:如果操作系统尚未准备好数据,当前进程或线程一直等待直到其就绪非阻塞IO:如果操作系统尚未准备好数据,进程或线程并不一直等待其就绪,而是可以做其他事情。进程/线程会周期性地轮询或查询IO操作的状态,以确定数据是否就绪。(因为需要频繁地从用户态切换到内核态,事实上并未比阻塞IO好多少)

IO多路复用

IO多路复用模型,就是通过一种新的系统调用,一个进程可以监视多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内核能够通知程序进行相应的IO系统调用。

Java实现IO多路复用的基本原理:通过select/epoll系统调用,单个线程不断地轮询select/epoll系统调用所负责的成百上千的socket连接,当某个或某些socket网络有连接数据到达了,就返回这些可以读写的连接。好处就显而易见了,通过一个系统调用,就可以查询到可以读写的一个甚至多个网络连接。

select 调用:内核提供的系统调用,它支持一次查询多个系统调用的可用状态。几乎所有的操作系统都支持。epoll 调用:属于 select 调用的增强版本,优化了 IO 的执行效率

阻塞 IO vs 多路复用

异步和同步IO

同步IO::线程自己去获取结果(一个线程,发起和返回结果都是自己)。所以阻塞IO和非阻塞IO包括IO多路复用都是同步的异步IO:线程自己不去获取结果,而是由其它线程返回结果(至少两个线程) 。异步模式下一定是非阻塞的,所以不存在异步阻塞。异步IO是指程序发起IO操作后,它可以继续执行其他任务,而不必等待IO操作完成。当IO操作完成后,程序会得到通知,可以处理已完成的IO操作。异步IO可以提高程序的并发性和响应性,因为它允许程序在等待IO的同时执行其他任务。

自己的理解:我感觉阻塞和非阻塞IO针对的是操作系统未准备好数据时进程的处理方式,是等待还是不等待。异步和同步IO针对的是IO操作未完成时(IO操作包括数据准备和数据拷贝两步骤)进程的处理方式,是等待还是不等待。

二.BIO

Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.ioBIO(blocking I/O) :同步阻塞 IO 模型 ,即在读写数据过程中会发生阻塞现象,直至有可供读取的数据或者数据能够写入。

服务器实现模式为

一个连接一个线程,即客户端有连接请求时服务器端就需 要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)

映射到Linux操作系统中,这就是一种最简单的IO模型,即阻塞IO。 阻塞 I/O 是最简单的 I/O 模型,一般表现为进程或线程等待某个条件,如果条件不满足,则一直等下去。条件满足,则进行下一步操作。

BIO客户端、服务端通信实现  

Server 服务端

/**

目标:实现服务端可以同时接收多个客户端的Socket通信需求。

思路:是服务端每接收到一个客户端socket请求对象之后都交给一个独立的线程来处理客户端的数据交互需求。

*/

public class Server {

public static void main(String[] args) {

try {

// 1、注册端口

ServerSocket ss = new ServerSocket(9999);

// 2、定义一个死循环,负责不断的接收客户端的Socket链接请求

while(true){

Socket socket = ss.accept();

// 3、创建一个独立的线程来处理与这个客户端的socket通信需求。

new ServerThreadReader(socket).start();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

ServerThreadReader 服务端与客户端保持通信的线程

public class ServerThreadReader extends Thread {

private Socket socket;

public ServerThreadReader(Socket socket){

this.socket = socket;

}

@Override

public void run() {

try {

// 从socket对象中得到一个字节输入流

InputStream is = socket.getInputStream();

// 使用缓冲字符输入流包装字节输入流

BufferedReader br = new BufferedReader(new InputStreamReader(is));

String msg;

while((msg = br.readLine())!=null){

System.out.println(msg);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

Client 客户端

/**

客户端

*/

public class Client {

public static void main(String[] args) {

try {

// 1、请求与服务端的Socket对象链接

Socket socket = new Socket("127.0.0.1" , 9999);

// 2、得到一个打印流

PrintStream ps = new PrintStream(socket.getOutputStream());

// 3、使用循环不断的发送消息给服务端接收

Scanner sc = new Scanner(System.in);

while(true){

System.out.print("请说:");

String msg = sc.nextLine();

ps.println(msg);

ps.flush();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

三.NIO

Java NIO(non-blocking)是从Java 1.4版本开始引入的一个新的IO API,NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,可以替代标准的Java IO API。NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行读写操作。

Java NIO(non-blocking) 映射的不是操作系统五大IO模型中的NIO模型(采用轮询的方式检查IO状态),而是另外的一种模型,叫做IO多路复用模型( IO multiplexing )。

Java 中的 NIO ,有一个非常重要的选择器 ( Selector ) 的概念,也可以被称为 多路复用器。通过它,只需要一个线程便可以管理多个客户端连接。当客户端数据到了之后,才会为其服务。

NIO 有三大核心部分:Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器)

1. 三大组件

Channel & Buffer

channel 有一点类似于 流,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 流 要么是输入,要么是输出,channel 比 流 更为底层

常见的 Channel 有

FileChannel (文件):从文件中读写数据。DatagramChannel (UDP):能通过 UDP 读写网络中的数据。SocketChannel(TCP Client):能通过 TCP 读写网络中的数据。ServerSocketChannel(TCP Server):可以监听新进来的 TCP 连接,像 Web 服务器那样。对每一个新进来的连接都会创建一个 SocketChannel

buffer 则用来缓冲读写数据,常见的 buffer 有

ByteBuffer(用的最多)

MappedByteBufferDirectByteBufferHeapByteBufferShortBufferIntBufferLongBufferFloatBufferDoubleBufferCharBuffer

Selector

selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合连接数特别多,但流量低的场景(low traffic)

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

2.ByteBuffer

2.1ByteBuffer的使用

向 buffer 写入数据,例如调用 channel.read(buffer)调用 flip() 切换至读模式从 buffer 读取数据,例如调用 buffer.get()调用 compact() 或 clear() 切换至写模式,compact()会自动压缩未读的,clear()则会直接清空一次可能读不完,重复 1~4 步骤读,

@Slf4j

public class TestByteBuffer {

public static void main(String[] args) {

// FileChannel 获得方式

// 1. 输入输出流, 2. RandomAccessFile

try (FileChannel channel = new RandomAccessFile("D:\\data.txt", "rw").getChannel()) {

// 准备缓冲区,指定容量后不可更改

ByteBuffer buffer = ByteBuffer.allocate(10);

while(true) {

// 从 channel 读取数据,向 buffer 写入

int len = channel.read(buffer);

log.debug("读取到的字节数 {}", len);

if(len == -1) { // 没有内容了

break;

}

// 打印 buffer 的内容

buffer.flip(); // 切换至读模式

while(buffer.hasRemaining()) { // 是否还有剩余未读数据

byte b = buffer.get();//get()会改变读指针,但get(i)不会,直接根据索引查找位置

log.debug("实际字节 {}", (char) b);

}

buffer.clear(); // 切换为写模式

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

2.2ByteBuffer 结构

ByteBuffer的结构可以看成一个连续的数组,有以下重要属性

capacity:容量position:起始位置limit:写入/读取限制位置

一开始

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态  

flip 动作发生后,position 切换为读取位置,limit 切换为读取限制  

读取 4 个字节后,状态

clear 动作发生后,状态

compact 方法,是把未读完的部分向前压缩,使position变成剩余未读的字节数,然后切换至写模式

2.3ByteBuffer的常用方法

分配空间  

分配容量后就不可修改了

ByteBuffer byteBuffer1 = ByteBuffer.allocate(容量);//class java.nio.HeapByteBuffer

ByteBuffer byteBuffer2 = ByteBuffer.allocateDirect(容量);//class java.nio.DirectByteBuffer

两种方法返回的实现类不同:

HeapByteBuffer:分配在 java 堆内存,读写效率较低,受到 GC(垃圾回收) 的影响DirectByteBuffer:通过调用本地操作系统的内存管理机制来分配堆外内存,读写效率高(不需要通过额外的复制操作将数据从堆内存复制到物理内存),不会受 GC 影响,但分配的效率低,并且如果释放不完全会造成内存泄漏

向 buffer 写入数据

有两种办法

调用 channel 的 read 方法调用 buffer 自己的 put 方法

从 buffer 读取数据

同样有两种办法

调用 channel 的 write 方法调用 buffer 自己的 get 方法

get 方法会让 position 读指针向后走,如果想重复读取数据

可以调用 rewind 方法将 position 重新置为 0或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

字符串与 ByteBuffer 互转

两种方法:

ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");

ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

debug(buffer1);

debug(buffer2);

CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);

System.out.println(buffer3.getClass());

System.out.println(buffer3.toString());

Buffer 是非线程安全的

分散读取、集中写入

2.4调试工具类

netty依赖

io.netty

netty-all

4.1.51.Final

import io.netty.util.internal.StringUtil;

import java.nio.ByteBuffer;

import static io.netty.util.internal.MathUtil.isOutOfBounds;

import static io.netty.util.internal.StringUtil.NEWLINE;

public class ByteBufferUtil {

private static final char[] BYTE2CHAR = new char[256];

private static final char[] HEXDUMP_TABLE = new char[256 * 4];

private static final String[] HEXPADDING = new String[16];

private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];

private static final String[] BYTE2HEX = new String[256];

private static final String[] BYTEPADDING = new String[16];

static {

final char[] DIGITS = "0123456789abcdef".toCharArray();

for (int i = 0; i < 256; i++) {

HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];

HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];

}

int i;

// Generate the lookup table for hex dump paddings

for (i = 0; i < HEXPADDING.length; i++) {

int padding = HEXPADDING.length - i;

StringBuilder buf = new StringBuilder(padding * 3);

for (int j = 0; j < padding; j++) {

buf.append(" ");

}

HEXPADDING[i] = buf.toString();

}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).

for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {

StringBuilder buf = new StringBuilder(12);

buf.append(NEWLINE);

buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));

buf.setCharAt(buf.length() - 9, '|');

buf.append('|');

HEXDUMP_ROWPREFIXES[i] = buf.toString();

}

// Generate the lookup table for byte-to-hex-dump conversion

for (i = 0; i < BYTE2HEX.length; i++) {

BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);

}

// Generate the lookup table for byte dump paddings

for (i = 0; i < BYTEPADDING.length; i++) {

int padding = BYTEPADDING.length - i;

StringBuilder buf = new StringBuilder(padding);

for (int j = 0; j < padding; j++) {

buf.append(' ');

}

BYTEPADDING[i] = buf.toString();

}

// Generate the lookup table for byte-to-char conversion

for (i = 0; i < BYTE2CHAR.length; i++) {

if (i <= 0x1f || i >= 0x7f) {

BYTE2CHAR[i] = '.';

} else {

BYTE2CHAR[i] = (char) i;

}

}

}

/**

* 打印所有内容

* @param buffer

*/

public static void debugAll(ByteBuffer buffer) {

int oldlimit = buffer.limit();

buffer.limit(buffer.capacity());

StringBuilder origin = new StringBuilder(256);

appendPrettyHexDump(origin, buffer, 0, buffer.capacity());

System.out.println("+--------+-------------------- all ------------------------+----------------+");

System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);

System.out.println(origin);

buffer.limit(oldlimit);

}

/**

* 打印可读取内容

* @param buffer

*/

public static void debugRead(ByteBuffer buffer) {

StringBuilder builder = new StringBuilder(256);

appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());

System.out.println("+--------+-------------------- read -----------------------+----------------+");

System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());

System.out.println(builder);

}

public static void main(String[] args) {

ByteBuffer buffer = ByteBuffer.allocate(10);

buffer.put(new byte[]{97, 98, 99, 100});

debugAll(buffer);

}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {

if (isOutOfBounds(offset, length, buf.capacity())) {

throw new IndexOutOfBoundsException(

"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length

+ ") <= " + "buf.capacity(" + buf.capacity() + ')');

}

if (length == 0) {

return;

}

dump.append(

" +-------------------------------------------------+" +

NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +

NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;

final int fullRows = length >>> 4;

final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.

for (int row = 0; row < fullRows; row++) {

int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.

appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump

int rowEndIndex = rowStartIndex + 16;

for (int j = rowStartIndex; j < rowEndIndex; j++) {

dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);

}

dump.append(" |");

// ASCII dump

for (int j = rowStartIndex; j < rowEndIndex; j++) {

dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);

}

dump.append('|');

}

// Dump the last row which has less than 16 bytes.

if (remainder != 0) {

int rowStartIndex = (fullRows << 4) + startIndex;

appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump

int rowEndIndex = rowStartIndex + remainder;

for (int j = rowStartIndex; j < rowEndIndex; j++) {

dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);

}

dump.append(HEXPADDING[remainder]);

dump.append(" |");

// Ascii dump

for (int j = rowStartIndex; j < rowEndIndex; j++) {

dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);

}

dump.append(BYTEPADDING[remainder]);

dump.append('|');

}

dump.append(NEWLINE +

"+--------+-------------------------------------------------+----------------+");

}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {

if (row < HEXDUMP_ROWPREFIXES.length) {

dump.append(HEXDUMP_ROWPREFIXES[row]);

} else {

dump.append(NEWLINE);

dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));

dump.setCharAt(dump.length() - 9, '|');

dump.append('|');

}

}

public static short getUnsignedByte(ByteBuffer buffer, int index) {

return (short) (buffer.get(index) & 0xFF);

}

3.文件编程

3.1FileChannel常用方法

FileChannel 只能工作在阻塞模式下,其他与网络有关的Channel则有阻塞模式与非阻塞模式两种

获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

通过 FileInputStream 获取的 channel 只能读通过 FileOutputStream 获取的 channel 只能写通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式(rw)决定

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

int readBytes = channel.read(buffer);

写入

ByteBuffer buffer = ...;

buffer.put(...); // 存入数据

buffer.flip(); // 切换读模式

while(buffer.hasRemaining()) {

channel.write(buffer);

}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

3.2两个Channel之间传输数据

超过 2g 大小的文件传输:

transferTo(起始位置,传输数,传输目标地)

public class TestFileChannelTransferTo {

public static void main(String[] args) {

try (

FileChannel from = new FileInputStream("data.txt").getChannel();

FileChannel to = new FileOutputStream("to.txt").getChannel();

) {

// 效率高,底层会利用操作系统的零拷贝进行优化

long size = from.size();

// left 变量代表还剩余多少字节

for (long left = size; left > 0; ) {

System.out.println("position:" + (size - left) + " left:" + left);

left -= from.transferTo((size - left), left, to);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

4.网络编程

4.1ServerSocketChannel

Java NIO中的 ServerSocketChannel 是一个可以监听新进来的TCP连接的通道, 就像标准IO中的ServerSocket一样。ServerSocketChannel类在 java.nio.channels包中。

ServerSocketChannel

ServerSocketChannel是非阻塞的,这意味着它可以在没有数据可用的情况下立即返回,而不必等待数据到达。ServerSocketChannel通常与Selector一起使用,可以使用单个线程处理多个通道的I/O操作。ServerSocket

ServerSocket是阻塞的,它会一直等待,直到有连接请求到达。ServerSocket通常在一个独立的线程中等待连接请求,并为每个连接创建一个新的线程进行处理。

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(8080));

ServerSocketChannel可以设置成非阻塞模式(默认为阻塞模式)。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。如:  

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(8080));

serverSocketChannel.configureBlocking(false);

while(true){

SocketChannel socketChannel = serverSocketChannel.accept();

if(socketChannel != null){

//使用socketChannel做一些工作...

}

}

4.2SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。SocketChannel和标准IO中的Socket是两种不同的网络编程方式。

SocketChannel

SocketChannel是非阻塞的,可以在没有数据可用的情况下立即返回。SocketChannel提供了读取和写入缓冲区的方法,可以使用直接缓冲区(Direct Buffer)或者间接缓冲区(Heap Buffer)进行数据传输。SocketChannel通常与Selector一起使用,可以使用单个线程处理多个通道的I/O操作。Socket

Socket是阻塞的,读取和写入操作会一直等待,直到有数据可用或操作完成。Socket使用InputStream和OutputStream进行读写操作。Socket通常在一个独立的线程中进行读写操作。

可以通过以下2种方式创建SocketChannel:  

打开一个SocketChannel并连接到互联网上的某台服务器。一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。  

SocketChannel socketChannel = SocketChannel.open();

socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

4.3Selector

选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心

创建

创建 Selector :通过调用 Selector.open() 方法创建一个 Selector。

Selector selector = Selector.open();

绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

channel.configureBlocking(false);//channel默认为阻塞模式,需要手动设置

SelectionKey key = channel.register(selector,绑定事件,附件);

注册到selector上的channel 必须工作在非阻塞模式,FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用

当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。可以监听的事件类型(用 可使用 SelectionKey 的四个常量 表示):

读 : SelectionKey.OP_READ (1)写 : SelectionKey.OP_WRITE (4)连接 : SelectionKey.OP_CONNECT (8)接收 : SelectionKey.OP_ACCEPT (16)

若注册时不止监听一个事件,则可以使用“位或”操作符连接。

int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE

第三个参数为附件,一般为bytebuffer。服务器在读写 bytebuffer 时很可能一次读取不完,需要多次读取,所以每次读写时需要做到数据共享。但我们又需要保证每个socketchannel都有自己的bytebuffer,则可以通过附件来指定。

监听 Channel 事件

方法1,阻塞直到绑定事件发生

int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

int count = selector.selectNow();

4.4处理 accept 事件

client

public class Client {

public static void main(String[] args) {

try (Socket socket = new Socket("localhost", 8080)) {

System.out.println(socket);

socket.getOutputStream().write("world".getBytes());

System.in.read();

} catch (IOException e) {

e.printStackTrace();

}

}

}

Server

@Slf4j

public class Server{

public static void main(String[] args) {

try (ServerSocketChannel channel = ServerSocketChannel.open()) {

channel.bind(new InetSocketAddress(8080));

System.out.println(channel);

Selector selector = Selector.open();

channel.configureBlocking(false);

channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {

int count = selector.select();

log.debug("select count: {}", count);

// 获取所有事件

Set keys = selector.selectedKeys();

// 遍历所有事件,逐一处理

Iterator iter = keys.iterator();

while (iter.hasNext()) {

SelectionKey key = iter.next();

// 判断事件类型

if (key.isAcceptable()) {

ServerSocketChannel c = (ServerSocketChannel) key.channel();

// 必须处理

SocketChannel sc = c.accept();

log.debug("{}", sc);

}

// 处理完毕,必须将事件移除

iter.remove();

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

4.5处理 read 事件

 Server

@Slf4j

public class Server {

private static void split(ByteBuffer source) {

source.flip();

for (int i = 0; i < source.limit(); i++) {

// 找到一条完整消息

if (source.get(i) == '\n') {

int length = i + 1 - source.position();

// 把这条完整消息存入新的 ByteBuffer

ByteBuffer target = ByteBuffer.allocate(length);

// 从 source 读,向 target 写

for (int j = 0; j < length; j++) {

target.put(source.get());

}

debugAll(target);

}

}

source.compact(); // 0123456789abcdef position 16 limit 16

}

public static void main(String[] args) throws IOException {

// 1. 创建 selector, 管理多个 channel

Selector selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.bind(new InetSocketAddress(8080));

ssc.configureBlocking(false);

// 2. 建立 selector 和 channel 的联系(注册)

// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件

SelectionKey sscKey = ssc.register(selector, 0, null);//selector、关注事件、附件

// key 只关注 accept 事件

sscKey.interestOps(SelectionKey.OP_ACCEPT);

log.debug("sscKey:{}", sscKey);

while (true) {

// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行

// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理

selector.select();

// 4. 处理事件, selectedKeys 内部包含了所有发生的事件

Iterator iter = selector.selectedKeys().iterator(); // accept, read

while (iter.hasNext()) {

SelectionKey key = iter.next();

// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题

iter.remove();

log.debug("key: {}", key);

// 5. 区分事件类型

if (key.isAcceptable()) { // 如果是 accept

ServerSocketChannel channel = (ServerSocketChannel) key.channel();

SocketChannel sc = channel.accept();

sc.configureBlocking(false);

ByteBuffer buffer = ByteBuffer.allocate(16); // attachment

// 将一个 byteBuffer 作为附件关联到 selectionKey 上

SelectionKey scKey = sc.register(selector, 0, buffer);//selector、关注事件、附件

scKey.interestOps(SelectionKey.OP_READ);

log.debug("{}", sc);

log.debug("scKey:{}", scKey);

} else if (key.isReadable()) { // 如果是 read

try {

SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel

// 获取 selectionKey 上关联的附件

ByteBuffer buffer = (ByteBuffer) key.attachment();

int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1

if(read == -1) {

key.cancel();

} else {

split(buffer);

// 需要扩容

if (buffer.position() == buffer.limit()) {

ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);

buffer.flip();

newBuffer.put(buffer); // 0123456789abcdef3333\n

key.attach(newBuffer);//关联新的ByteBuffer

}

}

} catch (IOException e) {

e.printStackTrace();

key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)

}

}

}

}

}

}

如果单个消息大于bytebuffer则会进行扩容,但只会越来越大而不会变小,这实际上会造成空间浪费。而Netty在bytebuffer上做了优化,能够自适应的调整bytebuffer的大小

Client

public class Client {

public static void main(String[] args) throws IOException {

SocketChannel sc = SocketChannel.open();

sc.connect(new InetSocketAddress("localhost", 8080));

SocketAddress address = sc.getLocalAddress();

// sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));

sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));

System.in.read();

}

}

消息边界问题

TCP是基于字节流的协议,这意味着在传输过程中没有明确的消息边界。发送方将数据流划分为TCP段,但接收方无法直接知道发送方在发送的每个TCP段的边界位置。接收方只能接收到连续的字节流,并需要根据应用层协议或其他手段来确定消息边界。

粘包问题

从TCP的这些细节我们可以看出,由于TCP是面向字节流的协议,不论是在发送端,传输链路,还是在接收端,多个TCP数据包,都有可能被合并成一个。这种特点会给我们造成一些困扰,比如我们发送了两条消息“hello”和“world”,预期应该是这样的

但是有可能会变成这样子

到这里我们可以看出,所谓的粘包问题,实际上问的是:TCP的上层应用如何正确处理消息边界。

具体来讲,不是所有的粘包现象都需要处理,若传输的数据为不带结构的连续流数据(如文件传输),则不必把粘连的包分开(简称分包)。但在实际工程应用中,传输的数据一般为带结构的数据,这时就需要做分包处理。

半包问题

消息边界问题还可能会带来半包问题,半包问题是指在数据传输中,数据包没有完整地传输完成就被接收端接收到,造成接收到的数据包不完整,即"半包"。这可能会导致数据不完整或无法正确解析。

如何正确处理TCP消息边界 

①使用标准的应用层协议(比如:http、https)来封装要传输的不定长的数据包

②在每条数据的尾部添加特殊字符, 如果遇到特殊字符, 代表当条数据接收完毕了

缺点: 效率低, 需要一个字节一个字节接收, 接收一个字节判断一次, 判断是不是那个特殊字符串

③在发送数据块之前, 在数据块最前边添加一个固定大小的数据头, 这时候数据由两部分组成:数据头+数据块

数据头:存储当前数据包的总字节数,接收端先接收数据头,然后在根据数据头接收对应大小的字节 数据块:当前数据包的内容

④固定长度:发送方将每个消息固定为相同的长度,不足部分使用特定的填充字符。接收方按照固定长度来切分接收到的字节流,并去除填充字符。

缺点: 浪费带宽和空间

client意外关闭或主动关闭时

client无论是意外关闭还是主动关闭都会触发一次读事件,如果不做相应处理会使服务端异常

意外关闭:在异常处理时记得关闭 事件

主动关闭:根据read的返回值判断关闭的读事件

4.6处理 write 事件

server

public class WriteServer {

public static void main(String[] args) throws IOException {

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.configureBlocking(false);

Selector selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT);

ssc.bind(new InetSocketAddress(8080));

while (true) {

selector.select();

Iterator iter = selector.selectedKeys().iterator();

while (iter.hasNext()) {

SelectionKey key = iter.next();

iter.remove();

if (key.isAcceptable()) {

SocketChannel sc = ssc.accept();

sc.configureBlocking(false);

SelectionKey sckey = sc.register(selector, 0, null);

sckey.interestOps(SelectionKey.OP_READ);

// 1. 向客户端发送大量数据

StringBuilder sb = new StringBuilder();

for (int i = 0; i < 5000000; i++) {

sb.append("a");

}

ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());

// 2. 返回值代表实际写入的字节数

int write = sc.write(buffer);

System.out.println(write);

// 3. 判断是否有剩余内容

if (buffer.hasRemaining()) {

// 4. 关注可写事件 1 4

sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);

// sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);

// 5. 把未写完的数据挂到 sckey 上

sckey.attach(buffer);

}

} else if (key.isWritable()) {

ByteBuffer buffer = (ByteBuffer) key.attachment();

SocketChannel sc = (SocketChannel) key.channel();

int write = sc.write(buffer);

System.out.println(write);

// 6. 清理操作

if (!buffer.hasRemaining()) {

key.attach(null); // 需要清除buffer

key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);//不需关注可写事件

}

}

}

}

}

}

client

public class WriteClient {

public static void main(String[] args) throws IOException {

SocketChannel sc = SocketChannel.open();

sc.connect(new InetSocketAddress("localhost", 8080));

// 3. 接收数据

int count = 0;

while (true) {

ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);

count += sc.read(buffer);

System.out.println(count);

buffer.clear();

}

}

}

一次无法保证把 buffer 中所有数据都写入 channel

非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

当消息处理器第一次写入消息时,才将 channel 注册到 selector 上selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册如果不取消,会每次可写均会触发 write 事件

4.7注意事项

 必须处理事件

SelectionKey sscKey = ssc.register(selector, 0, null);//selector、关注事件(0表示不关注任何事件)、附件

// key 只关注 accept 事件,如果之前关注了其他事件会覆盖,可以用 + 或 | 连接

sscKey.interestOps(SelectionKey.OP_ACCEPT);

事件发生后,要么处理,要么取消(cancel,cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件 ),不能什么都不做,否则下次该事件仍会触发(selector.select()不会阻塞),这是因为 nio 底层使用的是水平触发。

水平触发和边缘触发

水平触发:套接字上只要有数据,就一直触发边缘触发:套接字上有数据到来,才触发一次事件,无论数据是否读/写完

事件处理完后需要在 selectedKeys 集合中移除对应的SelectionKey

 因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如:

第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

ByteBuffer 大小分配

每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBufferByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能。另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

select 何时不阻塞

事件发生时

客户端发起连接请求,会触发 accept 事件客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件channel 可写,会触发 write 事件在 linux 下 nio bug 发生时调用 selector.wakeup()调用 selector.close()selector 所在线程 interrupt

5.多线程优化

之前讲的都是用单线程配合selector来管理多个channel上的事件,虽然可行但现在都是多核CPU,其他CPU没有被充分利用,也是一种资源浪费。并且单线程处理的时候,如果在处理某件事上耗费的时间过长,那么对其他事件的处理也有影响。Redis是单线程,底层采用了IO多路复用和NIO模型类似,所以也有这个缺点。

我们可以分两组选择器

单线程配一个选择器,专门处理 accept 事件创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

如果selector已经处于阻塞状态( selector.select() ),则无法将channel注册到selector上。我们如果想在线程之间传递数据的话可以用 队列。

案例

server

@Slf4j

public class MultiThreadServer {

public static void main(String[] args) throws IOException {

Thread.currentThread().setName("boss");

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.configureBlocking(false);

Selector boss = Selector.open();

SelectionKey bossKey = ssc.register(boss, 0, null);

bossKey.interestOps(SelectionKey.OP_ACCEPT);

ssc.bind(new InetSocketAddress(8080));

// 1. 创建固定数量的 worker 并初始化

Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];

for (int i = 0; i < workers.length; i++) {

workers[i] = new Worker("worker-" + i);

}

//计数器,第一次 get 是 0

AtomicInteger index = new AtomicInteger();

while(true) {

boss.select();

Iterator iter = boss.selectedKeys().iterator();

while (iter.hasNext()) {

SelectionKey key = iter.next();

iter.remove();

if (key.isAcceptable()) {

SocketChannel sc = ssc.accept();

sc.configureBlocking(false);

log.debug("connected...{}", sc.getRemoteAddress());

// 2. 关联 selector

log.debug("before register...{}", sc.getRemoteAddress());

// round robin 轮询

workers[index.getAndIncrement() % workers.length].register(sc); // boss 调用 初始化 selector , 启动 worker-0

log.debug("after register...{}", sc.getRemoteAddress());

}

}

}

}

static class Worker implements Runnable{

private Thread thread;

private Selector selector;

private String name;

private volatile boolean start = false; // 还未初始化

private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {

this.name = name;

}

// 初始化线程,和 selector

public void register(SocketChannel sc) throws IOException { //还是boss线程在调用

if(!start) {

selector = Selector.open();

thread = new Thread(this, name);

thread.start();

start = true;

}

//方法一:队列传递数据。向队列里添加任务,但这个任务并没有立即执行

queue.add(()->{

try {

sc.register(selector, SelectionKey.OP_READ, null);

} catch (ClosedChannelException e) {

e.printStackTrace();

}

});

selector.wakeup(); // 唤醒 select 方法

//方法二:直接使用 wakeup

// selector.wakeup(); // 唤醒 select 方法

// sc.register(selector, SelectionKey.OP_READ, null);

}

@Override

public void run() {

while(true) {

try {

selector.select(); // worker-0 阻塞

Runnable task= queue.poll();

if(task!=null){

task.run();//执行 sc.register(selector, SelectionKey.OP_READ, null);

}

Iterator iter = selector.selectedKeys().iterator();

while (iter.hasNext()) {

SelectionKey key = iter.next();

iter.remove();

if (key.isReadable()) {

//这里只是粗略的进行了读处理,并未解决消息边界、客户端停止连接等问题

ByteBuffer buffer = ByteBuffer.allocate(16);

SocketChannel channel = (SocketChannel) key.channel();

log.debug("read...{}", channel.getRemoteAddress());

channel.read(buffer);

buffer.flip();

debugAll(buffer);

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}

client

public class Client {

public static void main(String[] args) throws IOException {

SocketChannel sc = SocketChannel.open();

sc.connect(new InetSocketAddress("localhost", 8080));

SocketAddress address = sc.getLocalAddress();

sc.write(Charset.defaultCharset().encode("0123456789abcdef"));

System.in.read();

}

}

如何拿到 cpu 个数

Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

6.NIO vs BIO

stream vs channel

stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用二者均为全双工,即读写可以同时进行

四.AIO

AIO 用来解决数据复制阶段的阻塞问题

同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

Windows 系统通过 IOCP 实现了真正的异步 IOLinux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势。而我们的项目最终又需要部署到liunx服务器上,所以异步IO看起来很好,但实际用的很少。

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: