NIO

NIO的简介

Java NIONew IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO APINIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作

与IO的区别

IO NIO
面向流(Stream Oriented) 面向缓冲区(Buffer Oriented)
阻塞IO(Blocking IO) 非阻塞IO(Non Blocking IO)
(无) 选择器(Selectors)

nio-1

传统IO是通过建立管道,把数据以流的方式发送,类似于生活中的水流,并且是单向传输

nio-2

NIO是通过创建一条通道,文件传输的对端通过把文件内容存储在缓冲区中,发送到另一端后,另一端从缓冲区中读取内容,通道类似于生活中的火车轨道,它不具备存储功能,它只负责传输,而缓冲区则负责存储。

通道和缓冲区

Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。

简而言之,Channel 负责传输,Buffer 负责存储

缓冲区

缓冲区(Buffer):在 Java NIO 中负责数据的存取。缓冲区就是数组。用于存储不同数据类型的数据。

缓冲区(Buffer):一个用于特定基本数据类型的容器。由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。Java NIO 中的 Buffer 主要用于与NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的

Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型不同(boolean 除外) ,有以下 Buffer 常用子类:ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer

上述 Buffer 类 他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方法获取一个 Buffer对象:

1
2
3
4
5
6
//创建一个容量为 capacity 的 ByteBuffer 对象方法
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}

属性

Buffer 中的重要概念:

  • 容量 (capacity) :表示 Buffer 最大数据容量,缓冲区容量不能为负,并且创建后不能更改。

  • 限制 (limit):第一个不应该读取或写入的数据的索引,即位于limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量。

  • 位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制

  • 标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer 中的mark() 方法指定 Buffer 中一个特定的 position,之后可以通过调用 reset()方法恢复到这个 position

标记、位置、限制、容量遵守以下不变式: 0 <= mark <= position <= limit <= capacity

nio-3

方法 描述
Buffer clear() 清空缓冲区并返回对缓冲区的引用
Buffer flip() 将缓冲区的界限设置为当前位置,并将当前位置重置为 0
int capacity() 返回 Buffer 的 capacity 大小
boolean hasRemaining() 判断缓冲区中是否还有元素
int limit() 返回 Buffer 的界限(limit) 的位置
Buffer limit(int n) 将设置缓冲区界限为 n, 并返回
Buffer mark() 对缓冲区设置标记
int position() 返回缓冲区的当前位置 position
Buffer position(int n) 将设置缓冲区的当前位置为 n , 并返回修改后的 Buffer 对象
int remaining() 返回 position 和 limit 之间的元素个数
Buffer reset() 将位置 position 转到以前设置的 mark 所在的位置
Buffer rewind() 将位置设为 0, 取消设置的 mark

Buffer 所有子类提供了两个用于数据操作的方法:get() 与 put() 方法

获取 Buffer 中的数据

  • get() :读取单个字节
  • get(byte[] dst):批量读取多个字节到 dst 中
  • get(int index):读取指定索引位置的字节(不会移动 position)

放入数据到 Buffer

  • put(byte b):将给定单个字节写入缓冲区的当前位置
  • put(byte[] src):将 src 中的字节写入缓冲区的当前位置
  • put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Test
public void test1() {
String str = "abcde";

//1. 分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);

System.out.println("-----------------allocate()----------------");
System.out.println(buf.position());//0
System.out.println(buf.limit());//1024
System.out.println(buf.capacity());//1024

//2. 利用 put() 存入数据到缓冲区中
buf.put(str.getBytes());

System.out.println("-----------------put()----------------");
System.out.println(buf.position());//5
System.out.println(buf.limit());//1024
System.out.println(buf.capacity());//1024

//3. 切换读取数据模式
buf.flip();

System.out.println("-----------------flip()----------------");
System.out.println(buf.position());//0
System.out.println(buf.limit());//5
System.out.println(buf.capacity());//1024

//4. 利用 get() 读取缓冲区中的数据
byte[] dst = new byte[buf.limit()];
buf.get(dst);
System.out.println(new String(dst, 0, dst.length));//abcde

System.out.println("-----------------get()----------------");
System.out.println(buf.position()); //5
System.out.println(buf.limit());//5
System.out.println(buf.capacity());//1024

//5. rewind() : 可重复读
buf.rewind();

System.out.println("-----------------rewind()----------------");
System.out.println(buf.position());//0
System.out.println(buf.limit());//5
System.out.println(buf.capacity());//1024

//6. clear() : 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于“被遗忘”状态
buf.clear();

System.out.println("-----------------clear()----------------");
System.out.println(buf.position());//0
System.out.println(buf.limit());//1024
System.out.println(buf.capacity());//1024

System.out.println((char)buf.get());//a
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Test
public void test2(){
String str = "abcde";
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(str.getBytes());

buf.flip();

byte[] dst = new byte[buf.limit()];
buf.get(dst, 0, 2);
System.out.println(new String(dst, 0, 2));//ab
System.out.println(buf.position());//2

//mark() : 标记
buf.mark();

buf.get(dst, 2, 2);
System.out.println(new String(dst, 2, 2));//cd
System.out.println(buf.position());4

//reset() : 恢复到 mark 的位置
buf.reset();
System.out.println(buf.position());//2

//判断缓冲区中是否还有剩余数据
if(buf.hasRemaining()){
//获取缓冲区中可以操作的数量
System.out.println(buf.remaining());//3
}
}

字节缓冲区

字节缓冲区(ByteBuffer)比较特殊,它能够创建两种类型的缓冲区,分别是直接缓冲区和非直接缓冲区。直接缓冲区通过APIallocateDirect()方法创建,而非直接缓冲区则是平时使用的allocate()方法。

非直接缓冲区

nio-4

正常情况下,应用程序都是在操作系统上的用户地址空间,俗称用户空间,我们的IO请求都是需要先经过用户空间,用户空间向内核空间发起IO请求,内核空间去物理磁盘加载数据,然后内核空间将加载后的数据拷贝到用户空间,用户空间才将数据返回给应用程序,也就是说,非直接缓冲区都需要经过一次数据从内核空间拷贝到用户空间的一个过程。

直接缓冲区

nio-5

当使用直接缓冲区时, Java 虚拟机会尽最大努力直接在此缓冲区上执行本机IO 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。

直接字节缓冲区可以通过调用此类的allocateDirect()工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。

直接缓冲区有一个很严重的劣势,就是当应用程序把数据存放到直接缓冲区时,它什么时候向物理磁盘触发IO操作是无法控制被应用程序控制的,它是由操作系统掌控的。

并且响应结果到应用程序的效率不固定,很有可能出现磁盘IO操作已经完成,但是应用程序尚未得到回应,导致内存数据无法被释放,当大规模的并发产生时,它带来的影响是致命的,但是不可否认的是,即时它短时间内未响应,效率也依旧比非直接缓冲区快!

所以,建议将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。

直接字节缓冲区还可以通过 FileChannelmap()方法将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer

如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。

1
2
3
4
5
6
@Test
public void test3() {
//分配直接缓冲区
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
System.out.println(buf.isDirect()); //true
}

判断字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//使用直接缓冲区完成文件的复制(内存映射文件)
@Test
public void test4() throws IOException{
long start = System.currentTimeMillis();

FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

//内存映射文件
MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());

//直接对缓冲区进行数据的读写操作
byte[] dst = new byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);

inChannel.close();
outChannel.close();

long end = System.currentTimeMillis();
System.out.println("耗费时间为:" + (end - start));
}

通道

通道(Channel):由 java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。

Channel 类似于传统的 。只不过 Channel 本身不能直接访问数据,Channel 只能与Buffer 进行交互。

nio-6

在以前,内核空间请求物理磁盘都需要通过请求CPU提供的IO接口才能访问,由于这样导致CPU阻塞,利用率降低,所以分发了一块机器内存专门处理IO接口,传统IO流通过访问IO接口,并由DMA(直接存储器)向CPU申请权限许可,当许可获得后,就可以将IO操作授权给DMA全权处理,而不需要占用CPU的处理时间,大大提高了CPU的利用率。

当有大量的应用程序向操作系统发起IO请求时,会造成创建多个DMA数据总线,当DMA总线过多时,会造成总线冲突的问题。因为它们都会向CPU申请权限许可,最终也会影响CPU的性能。

nio-7

通道Channel附属于CPU中央处理器,是一个完全独立的处理器,有一套自身定义的命令和传输方式,专门用于IO操作,无需像CPU获得权限许可,和CPU彻底断绝关系,通道本质上和IO流没什么区别,只是在大型IO请求时,通道相较原来的IO流,CPU的利用率会更高。

实现类

Java 为 Channel 接口提供的最主要实现类如下:

  • FileChannel:用于读取、写入、映射和操作文件的通道。

  • DatagramChannel:通过 UDP 读写网络中的数据通道。

  • SocketChannel:通过 TCP 读写网络中的数据。

  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个SocketChannel

获取通道

获取通道的一种方式是对支持通道的对象调用getChannel()方法。支持通道的类如下:

  • FileInputStream

  • FileOutputStream

  • RandomAccessFile

  • DatagramSocket

  • Socket

  • ServerSocket

获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道。或者通过通道的静态方法 open()打开并返回指定通道。

1
2
3
4
5
6
public static FileChannel open(Path path, OpenOption... options)
throws IOException {
Set<OpenOption> set = new HashSet<OpenOption>(options.length);
Collections.addAll(set, options);
return open(path, set, NO_ATTRIBUTES);
}

其中可变参数OpenOption是指对文件的操作权限,具体可参考StandardOpenOption.class

  • StandardOpenOption.READ 只读模式
  • StandardOpenOption.WRITE 写模式
  • StandardOpenOption.CREATE 文件不存在则创建,存在则覆盖
  • StandardOpenOption.CREATE_NEW 文件不存在则创建,存在则报错

FileChannel的常用方法:

方法 描述
int read(ByteBuffer dst) 从 Channel 中读取数据到 ByteBuffer
long read(ByteBuffer[] dsts) 将 Channel 中的数据“分散”到 ByteBuffer[]
int write(ByteBuffer src) 将 ByteBuffer 中的数据写入到 Channel
long write(ByteBuffer[] srcs) 将 ByteBuffer[]中的数据“聚集”到 Channel
long position() 返回此通道的文件位置
FileChannel position(long p) 设置此通道的文件位置
long size() 返回此通道的文件的当前大小
FileChannel truncate(long s) 将此通道的文件截取为给定大小
void force(boolean metaData) 强制将所有对此通道的文件更新写入到存储设备中

数据传输

通道之间的传输可以通过write()read()接口,如下:

1
2
3
4
//将Buffer 中数据写入Channel
int bytesWritten = outChannel.write(buf);
//从 Channel 读取数据到 Buffer
int bytesRead = inChannel.read(buf);

也可以使用更简便的 transferFrom()transferTo()接口,通过直接缓冲区实现,如下:

1
2
3
4
//将数据从源通道(inChannel)传输到outChannel中
inChannel.transferTo(0, inChannel.size(), outChannel);
//与上同义
outChannel.transferFrom(inChannel, 0, inChannel.size());

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//利用通道完成文件的复制(非直接缓冲区)
@Test
public void test1() throws Exception {
long start = System.currentTimeMillis();

FileInputStream fis = new FileInputStream("1.jpg");
FileOutputStream fos = new FileOutputStream("2.jpg");
//1、获取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
//2、分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//3、将通道中的数据存入缓冲区中
while(inChannel.read(buf) != -1){
buf.flip(); //切换读取数据的模式
//4、将缓冲区中的数据写入通道中
outChannel.write(buf);
buf.clear(); //清空缓冲区
}
outChannel.close();
inChannel.close();
fos.close();
fis.close();

long end = System.currentTimeMillis();
System.out.println("耗费时间为:" + (end - start));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//通道之间的数据传输(直接缓冲区)
@Test
public void test3() throws IOException{

FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

//inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());

inChannel.close();
outChannel.close();
}

分散和聚集

分散读取Scattering Reads)是指从 Channel中读取的数据分散到多个 Buffer 中,按照缓冲区的顺序,从 Channel 中读取的数据依次将 Buffer 填满。

nio-8

聚集写入Gathering Writes)是指将多个 Buffer 中的数据聚集到 Channel,按照缓冲区的顺序,写入 positionlimit 之间的数据到 Channel

nio-9

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//分散和聚集
@Test
public void test4() throws IOException{
RandomAccessFile raf1 = new RandomAccessFile("1.txt", "rw");

//1. 获取通道
FileChannel channel1 = raf1.getChannel();

//2. 分配指定大小的缓冲区
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);

//3. 分散读取
ByteBuffer[] bufs = {buf1, buf2};
channel1.read(bufs);

for (ByteBuffer byteBuffer : bufs) {
byteBuffer.flip();
}

System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("-----------------");
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));

//4. 聚集写入
RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw");
FileChannel channel2 = raf2.getChannel();

channel2.write(bufs);
}

字符集

Charset用来处理字符串和字节数组相互转换的编码问题。

1
2
3
4
5
6
7
8
9
10
11
@Test
public void test5() {
Map<String, Charset> map = Charset.availableCharsets();

Set<Entry<String, Charset>> set = map.entrySet();

//输出所有支持的字符集编码格式
for (Entry<String, Charset> entry : set) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
}

用什么编码格式转义的,就需要用什么编码格式反转义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void test6() throws IOException {
Charset cs1 = Charset.forName("GBK");
CharBuffer cBuf = CharBuffer.allocate(1024);
cBuf.put("测试测试!");
cBuf.flip();

// 编码
ByteBuffer bBuf = cs1.encode(cBuf);
// 解码
bBuf.flip();
CharBuffer cBuf2 = cs1.decode(bBuf);
System.out.println(cBuf2.toString());
}

阻塞

传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read()write() 时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

nio-10

当客户端向服务端起请求时,服务端调度accept()创建Socket链接,客户端发送的数据首先会先到达内核空间,这时候的服务端需要等待内核空间完全接收完数据,数据准备完成后,将数据从内核空间拷贝到用户空间。

内核空间并不知道客户端数据是否发送完毕,或者是否有新数据发送,所以只能一直等待着客户端的响应,而在内核空间等待的这段时间中,服务端一直处于阻塞状态,它在等待着内核空间将数据拷贝到用户空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//阻塞客户端
@Test
public void client() throws IOException{
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);

ByteBuffer buf = ByteBuffer.allocate(1024);

while(inChannel.read(buf) != -1){
buf.flip();
sChannel.write(buf);
buf.clear();
}
//告诉服务端已经传送完毕
//如果不加这段代码,服务端将一直阻塞,而客户端也一直阻塞等待服务端的反馈
sChannel.shutdownOutput();

//接收服务端的反馈
int len = 0;
while((len = sChannel.read(buf)) != -1){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}

inChannel.close();
sChannel.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//服务端
@Test
public void server() throws IOException{
ServerSocketChannel ssChannel = ServerSocketChannel.open();

FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);

ssChannel.bind(new InetSocketAddress(9898));

SocketChannel sChannel = ssChannel.accept();

ByteBuffer buf = ByteBuffer.allocate(1024);

while(sChannel.read(buf) != -1){
buf.flip();
outChannel.write(buf);
buf.clear();
}

//发送反馈给客户端
buf.put("服务端接收数据成功".getBytes());
buf.flip();
sChannel.write(buf);

sChannel.close();
outChannel.close();
ssChannel.close();
}

非阻塞

Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

nio-11

当用户线程发起一个read操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。

客户端的所有通道都将注册到Selector选择器上,选择器将监控通道上的IO状况,当通道上的数据准备就绪后,选择器才会将这个任务分配到服务端一个或者多个线程上运行。

选择器

选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 ChannelSelector 是非阻塞 IO 的核心。

nio-12

1
2
//通过调用 Selector.open() 方法创建一个 Selector。
Selector selector = Selector.open();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();

//2. 切换非阻塞模式
ssChannel.configureBlocking(false);

//3. 绑定连接
ssChannel.bind(new InetSocketAddress(9898));

//4. 获取选择器
Selector selector = Selector.open();

//5. 将通道注册到选择器上, 并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

当调度上面的第五步的regitster接口将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。

可以监听的事件类型(可使用 SelectionKey 的四个常量表示):

  • 读 : SelectionKey.OP_READ (1)

  • 写 : SelectionKey.OP_WRITE (4)

  • 连接 : SelectionKey.OP_CONNECT (8)

  • 接收 : SelectionKey.OP_ACCEPT (16)

若注册时不止监听一个事件,则可以使用“位或”操作符连接。

1
int ops = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey:表示 SelectableChannelSelector 之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操作。

Selector 常用方法:

方法 描述
Set keys() 所有的 SelectionKey 集合。代表注册在该Selector上的Channel
selectedKeys() 被选择的 SelectionKey 集合。返回此Selector的已选择键集
int select() 监控所有注册的Channel,当它们中间有需要处理的 IO 操作时,该方法返回,并将对应得的 SelectionKey 加入被选择的SelectionKey 集合中,该方法返回这些 Channel 的数量。
int select(long timeout) 可以设置超时时长的 select() 操作
int selectNow() 执行一个立即返回的 select() 操作,该方法不会阻塞线程
Selector wakeup() 使一个还未返回的 select() 方法立即返回
void close() 关闭该选择器

SelectionKey 常用方法:

方法 描述
int interestOps() 获取感兴趣事件集合
int readyOps() 获取通道已经准备就绪的操作的集合
SelectableChannel channel() 获取注册通道
Selector selector() 返回选择器
boolean isReadable() 检测 Channal 中读事件是否就绪
boolean isWritable() 检测 Channal 中写事件是否就绪
boolean isConnectable() 检测 Channel 中连接是否就绪
boolean isAcceptable() 检测 Channel 中接收是否就绪

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//客户端
@Test
public void client() throws IOException{
//1. 获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

//2. 切换非阻塞模式
sChannel.configureBlocking(false);

//3. 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);

//4. 发送数据给服务端
Scanner scan = new Scanner(System.in);

while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}

//5. 关闭通道
sChannel.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//服务端
@Test
public void server() throws IOException{
//1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();

//2. 切换非阻塞模式
ssChannel.configureBlocking(false);

//3. 绑定连接
ssChannel.bind(new InetSocketAddress(9898));

//4. 获取选择器
Selector selector = Selector.open();

//5. 将通道注册到选择器上, 并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

//6. 轮询式的获取选择器上已经“准备就绪”的事件
while(selector.select() > 0){

//7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
Iterator<SelectionKey> it = selector.selectedKeys().iterator();

while(it.hasNext()){
//8. 获取准备“就绪”的是事件
SelectionKey sk = it.next();

//9. 判断具体是什么事件准备就绪
if(sk.isAcceptable()){
//10. 若“接收就绪”,获取客户端连接
SocketChannel sChannel = ssChannel.accept();

//11. 切换非阻塞模式
sChannel.configureBlocking(false);

//12. 将该通道注册到选择器上
sChannel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
//13. 获取当前选择器上“读就绪”状态的通道
SocketChannel sChannel = (SocketChannel) sk.channel();

//14. 读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);

int len = 0;
while((len = sChannel.read(buf)) > 0 ){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}

//15. 取消选择键 SelectionKey
it.remove();
}
}
}

管道

Java NIO 管道是2个线程之间的单向数据连接Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

nio-13

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void test1() throws IOException{
//1. 获取管道
Pipe pipe = Pipe.open();
//---------------线程A[start]----------------
//2. 将缓冲区中的数据写入管道
ByteBuffer buf = ByteBuffer.allocate(1024);

Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据".getBytes());
buf.flip();
sinkChannel.write(buf);
//---------------线程A[end]------------------

//---------------线程B[start]----------------
//3. 读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
//---------------线程B[end]------------------
sourceChannel.close();
sinkChannel.close();
}

最后更新: 2020年05月28日 20:14

原始链接: https://midkuro.gitee.io/2020/05/28/java-nio/

× 请我吃糖~
打赏二维码