NIO
NIO的简介
Java NIO
(New IO
)是从Java 1.4版本开始引入的一个新的IO API
,可以替代标准的Java IO API
。NIO
与原来的IO
有同样的作用和目的,但是使用的方式完全不同,NIO
支持面向缓冲区的、基于通道的IO
操作。NIO将以更加高效的方式进行文件的读写操作。
与IO的区别
IO |
NIO |
面向流(Stream Oriented) |
面向缓冲区(Buffer Oriented) |
阻塞IO(Blocking IO) |
非阻塞IO(Non Blocking IO) |
(无) |
选择器(Selectors) |
data:image/s3,"s3://crabby-images/24f64/24f644692ff2cda362780fb60328ef19e9cf5153" alt="nio-1"
传统IO是通过建立管道,把数据以流的方式发送,类似于生活中的水流,并且是单向传输。
data:image/s3,"s3://crabby-images/254c6/254c6fce39b2b245a235d6a17976bf9d617ca8bb" alt="nio-2"
而NIO
是通过创建一条通道,文件传输的对端通过把文件内容存储在缓冲区中,发送到另一端后,另一端从缓冲区中读取内容,通道类似于生活中的火车轨道,它不具备存储功能,它只负责传输,而缓冲区则负责存储。
通道和缓冲区
Java NIO
系统的核心在于:通道(Channel
)和缓冲区(Buffer
)。通道表示打开到 IO
设备(例如:文件、套接字)的连接。若需要使用 NIO
系统,需要获取用于连接 IO
设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。
简而言之,Channel
负责传输,Buffer
负责存储。
缓冲区
缓冲区(Buffer
):在 Java NIO
中负责数据的存取。缓冲区就是数组。用于存储不同数据类型的数据。
缓冲区(Buffer
):一个用于特定基本数据类型的容器。由 java.nio
包定义的,所有缓冲区都是 Buffer
抽象类的子类。Java NIO
中的 Buffer
主要用于与NIO
通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的。
Buffer
就像一个数组,可以保存多个相同类型的数据。根据数据类型不同(boolean
除外) ,有以下 Buffer
常用子类:ByteBuffer
、CharBuffer
、ShortBuffer
、IntBuffer
、LongBuffer
、FloatBuffer
、DoubleBuffer
。
上述 Buffer
类 他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方法获取一个 Buffer
对象:
1 2 3 4 5 6
| public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); }
|
属性
Buffer
中的重要概念:
容量 (capacity) :表示 Buffer
最大数据容量,缓冲区容量不能为负,并且创建后不能更改。
限制 (limit):第一个不应该读取或写入的数据的索引,即位于limit
后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量。
位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制
标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer
中的mark()
方法指定 Buffer
中一个特定的 position
,之后可以通过调用 reset()
方法恢复到这个 position
。
标记、位置、限制、容量遵守以下不变式: 0 <= mark <= position <= limit <= capacity。
data:image/s3,"s3://crabby-images/3298e/3298e0c8b7ca4ada220fd3f0e2ea1579c81d30df" alt="nio-3"
方法 |
描述 |
Buffer clear() |
清空缓冲区并返回对缓冲区的引用 |
Buffer flip() |
将缓冲区的界限设置为当前位置,并将当前位置重置为 0 |
int capacity() |
返回 Buffer 的 capacity 大小 |
boolean hasRemaining() |
判断缓冲区中是否还有元素 |
int limit() |
返回 Buffer 的界限(limit) 的位置 |
Buffer limit(int n) |
将设置缓冲区界限为 n, 并返回 |
Buffer mark() |
对缓冲区设置标记 |
int position() |
返回缓冲区的当前位置 position |
Buffer position(int n) |
将设置缓冲区的当前位置为 n , 并返回修改后的 Buffer 对象 |
int remaining() |
返回 position 和 limit 之间的元素个数 |
Buffer reset() |
将位置 position 转到以前设置的 mark 所在的位置 |
Buffer rewind() |
将位置设为 0, 取消设置的 mark |
Buffer
所有子类提供了两个用于数据操作的方法:get() 与 put() 方法
获取 Buffer 中的数据
get()
:读取单个字节
get(byte[] dst)
:批量读取多个字节到 dst 中
get(int index)
:读取指定索引位置的字节(不会移动 position)
放入数据到 Buffer 中
put(byte b)
:将给定单个字节写入缓冲区的当前位置
put(byte[] src)
:将 src 中的字节写入缓冲区的当前位置
put(int index, byte b)
:将指定字节写入缓冲区的索引位置(不会移动 position)
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Test public void test1() { String str = "abcde";
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println("-----------------allocate()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity());
buf.put(str.getBytes());
System.out.println("-----------------put()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity());
buf.flip();
System.out.println("-----------------flip()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity());
byte[] dst = new byte[buf.limit()]; buf.get(dst); System.out.println(new String(dst, 0, dst.length));
System.out.println("-----------------get()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity());
buf.rewind();
System.out.println("-----------------rewind()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity());
buf.clear();
System.out.println("-----------------clear()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity());
System.out.println((char)buf.get()); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Test public void test2(){ String str = "abcde"; ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(str.getBytes());
buf.flip();
byte[] dst = new byte[buf.limit()]; buf.get(dst, 0, 2); System.out.println(new String(dst, 0, 2)); System.out.println(buf.position());
buf.mark();
buf.get(dst, 2, 2); System.out.println(new String(dst, 2, 2)); System.out.println(buf.position());4
buf.reset(); System.out.println(buf.position());
if(buf.hasRemaining()){ System.out.println(buf.remaining()); } }
|
字节缓冲区
字节缓冲区(ByteBuffer
)比较特殊,它能够创建两种类型的缓冲区,分别是直接缓冲区和非直接缓冲区。直接缓冲区通过API
的allocateDirect()
方法创建,而非直接缓冲区则是平时使用的allocate()
方法。
非直接缓冲区
data:image/s3,"s3://crabby-images/01f5b/01f5b716515929f07d91b535c984b38f90c781c7" alt="nio-4"
正常情况下,应用程序都是在操作系统上的用户地址空间,俗称用户空间,我们的IO请求都是需要先经过用户空间,用户空间向内核空间发起IO请求,内核空间去物理磁盘加载数据,然后内核空间将加载后的数据拷贝到用户空间,用户空间才将数据返回给应用程序,也就是说,非直接缓冲区都需要经过一次数据从内核空间拷贝到用户空间的一个过程。
直接缓冲区
data:image/s3,"s3://crabby-images/a866b/a866b7392953d124d192cd03dc0d59829e9bef6b" alt="nio-5"
当使用直接缓冲区时, Java 虚拟机会尽最大努力直接在此缓冲区上执行本机IO
操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
直接字节缓冲区可以通过调用此类的allocateDirect()
工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。
直接缓冲区有一个很严重的劣势,就是当应用程序把数据存放到直接缓冲区时,它什么时候向物理磁盘触发IO
操作是无法控制被应用程序控制的,它是由操作系统掌控的。
并且响应结果到应用程序的效率不固定,很有可能出现磁盘IO
操作已经完成,但是应用程序尚未得到回应,导致内存数据无法被释放,当大规模的并发产生时,它带来的影响是致命的,但是不可否认的是,即时它短时间内未响应,效率也依旧比非直接缓冲区快!
所以,建议将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
直接字节缓冲区还可以通过 FileChannel
的map()
方法将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer
。
如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。
1 2 3 4 5 6
| @Test public void test3() { ByteBuffer buf = ByteBuffer.allocateDirect(1024); System.out.println(buf.isDirect()); }
|
判断字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect()
方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Test public void test4() throws IOException{ long start = System.currentTimeMillis();
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
byte[] dst = new byte[inMappedBuf.limit()]; inMappedBuf.get(dst); outMappedBuf.put(dst);
inChannel.close(); outChannel.close();
long end = System.currentTimeMillis(); System.out.println("耗费时间为:" + (end - start)); }
|
通道
通道(Channel
):由 java.nio.channels
包定义的。Channel 表示 IO 源与目标打开的连接。
Channel
类似于传统的 流。只不过 Channel
本身不能直接访问数据,Channel
只能与Buffer
进行交互。
data:image/s3,"s3://crabby-images/7d729/7d729937e7047f7b281e2310991508e9a37ea507" alt="nio-6"
在以前,内核空间请求物理磁盘都需要通过请求CPU
提供的IO
接口才能访问,由于这样导致CPU
阻塞,利用率降低,所以分发了一块机器内存专门处理IO
接口,传统IO
流通过访问IO
接口,并由DMA
(直接存储器)向CPU
申请权限许可,当许可获得后,就可以将IO
操作授权给DMA
全权处理,而不需要占用CPU
的处理时间,大大提高了CPU
的利用率。
当有大量的应用程序向操作系统发起IO
请求时,会造成创建多个DMA
数据总线,当DMA
总线过多时,会造成总线冲突的问题。因为它们都会向CPU
申请权限许可,最终也会影响CPU
的性能。
data:image/s3,"s3://crabby-images/7a6db/7a6db6ed7941b7cba4efbd7ceed40c3349c93c4c" alt="nio-7"
通道Channel
附属于CPU
中央处理器,是一个完全独立的处理器,有一套自身定义的命令和传输方式,专门用于IO
操作,无需像CPU
获得权限许可,和CPU
彻底断绝关系,通道本质上和IO
流没什么区别,只是在大型IO
请求时,通道相较原来的IO
流,CPU
的利用率会更高。
实现类:
Java 为 Channel 接口提供的最主要实现类如下:
FileChannel
:用于读取、写入、映射和操作文件的通道。
DatagramChannel
:通过 UDP 读写网络中的数据通道。
SocketChannel
:通过 TCP 读写网络中的数据。
ServerSocketChannel
:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个SocketChannel
。
获取通道:
获取通道的一种方式是对支持通道的对象调用getChannel()
方法。支持通道的类如下:
FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
获取通道的其他方式是使用 Files
类的静态方法 newByteChannel()
获取字节通道。或者通过通道的静态方法 open()
打开并返回指定通道。
1 2 3 4 5 6
| public static FileChannel open(Path path, OpenOption... options) throws IOException { Set<OpenOption> set = new HashSet<OpenOption>(options.length); Collections.addAll(set, options); return open(path, set, NO_ATTRIBUTES); }
|
其中可变参数OpenOption
是指对文件的操作权限,具体可参考StandardOpenOption.class
。
StandardOpenOption.READ
只读模式
StandardOpenOption.WRITE
写模式
StandardOpenOption.CREATE
文件不存在则创建,存在则覆盖
StandardOpenOption.CREATE_NEW
文件不存在则创建,存在则报错
FileChannel的常用方法:
方法 |
描述 |
int read(ByteBuffer dst) |
从 Channel 中读取数据到 ByteBuffer |
long read(ByteBuffer[] dsts) |
将 Channel 中的数据“分散”到 ByteBuffer[] |
int write(ByteBuffer src) |
将 ByteBuffer 中的数据写入到 Channel |
long write(ByteBuffer[] srcs) |
将 ByteBuffer[]中的数据“聚集”到 Channel |
long position() |
返回此通道的文件位置 |
FileChannel position(long p) |
设置此通道的文件位置 |
long size() |
返回此通道的文件的当前大小 |
FileChannel truncate(long s) |
将此通道的文件截取为给定大小 |
void force(boolean metaData) |
强制将所有对此通道的文件更新写入到存储设备中 |
数据传输
通道之间的传输可以通过write()
、read()
接口,如下:
1 2 3 4
| int bytesWritten = outChannel.write(buf);
int bytesRead = inChannel.read(buf);
|
也可以使用更简便的 transferFrom()
、transferTo()
接口,通过直接缓冲区实现,如下:
1 2 3 4
| inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
|
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Test public void test1() throws Exception { long start = System.currentTimeMillis();
FileInputStream fis = new FileInputStream("1.jpg"); FileOutputStream fos = new FileOutputStream("2.jpg"); FileChannel inChannel = fis.getChannel(); FileChannel outChannel = fos.getChannel(); ByteBuffer buf = ByteBuffer.allocate(1024); while(inChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } outChannel.close(); inChannel.close(); fos.close(); fis.close();
long end = System.currentTimeMillis(); System.out.println("耗费时间为:" + (end - start)); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void test3() throws IOException{ FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
outChannel.transferFrom(inChannel, 0, inChannel.size());
inChannel.close(); outChannel.close(); }
|
分散和聚集
分散读取(Scattering Reads
)是指从 Channel
中读取的数据分散到多个 Buffer
中,按照缓冲区的顺序,从 Channel
中读取的数据依次将 Buffer 填满。
data:image/s3,"s3://crabby-images/9c126/9c126eedbedb36723819f22b7aaf98c5c59362a9" alt="nio-8"
聚集写入(Gathering Writes
)是指将多个 Buffer
中的数据聚集到 Channel,按照缓冲区的顺序,写入 position
和 limit
之间的数据到 Channel
。
data:image/s3,"s3://crabby-images/d7645/d7645f10fa0b1a8b19e3b475546f092fce96bb79" alt="nio-9"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Test public void test4() throws IOException{ RandomAccessFile raf1 = new RandomAccessFile("1.txt", "rw"); FileChannel channel1 = raf1.getChannel(); ByteBuffer buf1 = ByteBuffer.allocate(100); ByteBuffer buf2 = ByteBuffer.allocate(1024); ByteBuffer[] bufs = {buf1, buf2}; channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(), 0, bufs[0].limit())); System.out.println("-----------------"); System.out.println(new String(bufs[1].array(), 0, bufs[1].limit())); RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw"); FileChannel channel2 = raf2.getChannel(); channel2.write(bufs); }
|
字符集
Charset
用来处理字符串和字节数组相互转换的编码问题。
1 2 3 4 5 6 7 8 9 10 11
| @Test public void test5() { Map<String, Charset> map = Charset.availableCharsets();
Set<Entry<String, Charset>> set = map.entrySet(); for (Entry<String, Charset> entry : set) { System.out.println(entry.getKey() + "=" + entry.getValue()); } }
|
用什么编码格式转义的,就需要用什么编码格式反转义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Test public void test6() throws IOException { Charset cs1 = Charset.forName("GBK"); CharBuffer cBuf = CharBuffer.allocate(1024); cBuf.put("测试测试!"); cBuf.flip();
ByteBuffer bBuf = cs1.encode(cBuf); bBuf.flip(); CharBuffer cBuf2 = cs1.decode(bBuf); System.out.println(cBuf2.toString()); }
|
阻塞
传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read()
或 write()
时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。
data:image/s3,"s3://crabby-images/bb452/bb4525204cb81d94c5c2dd34ea48b1cdc4abddbb" alt="nio-10"
当客户端向服务端起请求时,服务端调度accept()
创建Socket
链接,客户端发送的数据首先会先到达内核空间,这时候的服务端需要等待内核空间完全接收完数据,数据准备完成后,将数据从内核空间拷贝到用户空间。
内核空间并不知道客户端数据是否发送完毕,或者是否有新数据发送,所以只能一直等待着客户端的响应,而在内核空间等待的这段时间中,服务端一直处于阻塞状态,它在等待着内核空间将数据拷贝到用户空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Test public void client() throws IOException{ SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
ByteBuffer buf = ByteBuffer.allocate(1024);
while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } sChannel.shutdownOutput();
int len = 0; while((len = sChannel.read(buf)) != -1){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); }
inChannel.close(); sChannel.close(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Test public void server() throws IOException{ ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
ssChannel.bind(new InetSocketAddress(9898));
SocketChannel sChannel = ssChannel.accept();
ByteBuffer buf = ByteBuffer.allocate(1024);
while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); }
buf.put("服务端接收数据成功".getBytes()); buf.flip(); sChannel.write(buf);
sChannel.close(); outChannel.close(); ssChannel.close(); }
|
非阻塞
Java NIO
是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO
可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。
data:image/s3,"s3://crabby-images/03527/035270d95b2c022a5041b2a5c2df780ab0de2875" alt="nio-11"
当用户线程发起一个read
操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个error
时,它就知道数据还没有准备好,于是它可以再次发送read
操作。
客户端的所有通道都将注册到Selector
选择器上,选择器将监控通道上的IO
状况,当通道上的数据准备就绪后,选择器才会将这个任务分配到服务端一个或者多个线程上运行。
选择器
选择器(Selector
) 是 SelectableChannle
对象的多路复用器,Selector 可以同时监控多个 SelectableChannel
的 IO 状况,也就是说,利用 Selector
可使一个单独的线程管理多个 Channel
,Selector
是非阻塞 IO 的核心。
data:image/s3,"s3://crabby-images/98c2e/98c2eac07d014a660c7170ac014a5b9bacb49546" alt="nio-12"
1 2
| Selector selector = Selector.open();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
|
当调度上面的第五步的regitster
接口将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。
可以监听的事件类型(可使用 SelectionKey 的四个常量表示):
读 : SelectionKey.OP_READ
(1)
写 : SelectionKey.OP_WRITE
(4)
连接 : SelectionKey.OP_CONNECT
(8)
接收 : SelectionKey.OP_ACCEPT
(16)
若注册时不止监听一个事件,则可以使用“位或”操作符连接。
1
| int ops = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
|
SelectionKey
:表示 SelectableChannel
和 Selector
之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含两个表示为整数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操作。
Selector 常用方法:
方法 |
描述 |
Set keys() |
所有的 SelectionKey 集合。代表注册在该Selector上的Channel |
selectedKeys() |
被选择的 SelectionKey 集合。返回此Selector的已选择键集 |
int select() |
监控所有注册的Channel,当它们中间有需要处理的 IO 操作时,该方法返回,并将对应得的 SelectionKey 加入被选择的SelectionKey 集合中,该方法返回这些 Channel 的数量。 |
int select(long timeout) |
可以设置超时时长的 select() 操作 |
int selectNow() |
执行一个立即返回的 select() 操作,该方法不会阻塞线程 |
Selector wakeup() |
使一个还未返回的 select() 方法立即返回 |
void close() |
关闭该选择器 |
SelectionKey 常用方法:
方法 |
描述 |
int interestOps() |
获取感兴趣事件集合 |
int readyOps() |
获取通道已经准备就绪的操作的集合 |
SelectableChannel channel() |
获取注册通道 |
Selector selector() |
返回选择器 |
boolean isReadable() |
检测 Channal 中读事件是否就绪 |
boolean isWritable() |
检测 Channal 中写事件是否就绪 |
boolean isConnectable() |
检测 Channel 中连接是否就绪 |
boolean isAcceptable() |
检测 Channel 中接收是否就绪 |
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Test public void client() throws IOException{ SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
sChannel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){ String str = scan.next(); buf.put((new Date().toString() + "\n" + str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); }
sChannel.close(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Test public void server() throws IOException{ ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
while(selector.select() > 0){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){ SelectionKey sk = it.next();
if(sk.isAcceptable()){ SocketChannel sChannel = ssChannel.accept();
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ); }else if(sk.isReadable()){ SocketChannel sChannel = (SocketChannel) sk.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0; while((len = sChannel.read(buf)) > 0 ){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } }
it.remove(); } } }
|
管道
Java NIO 管道是2个线程之间的单向数据连接。Pipe
有一个source
通道和一个sink
通道。数据会被写到sink
通道,从source
通道读取。
data:image/s3,"s3://crabby-images/779e0/779e03dcff22cee99593dd247966098b73cbe748" alt="nio-13"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Test public void test1() throws IOException{ Pipe pipe = Pipe.open(); ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink(); buf.put("通过单向管道发送数据".getBytes()); buf.flip(); sinkChannel.write(buf);
Pipe.SourceChannel sourceChannel = pipe.source(); buf.flip(); int len = sourceChannel.read(buf); System.out.println(new String(buf.array(), 0, len)); sourceChannel.close(); sinkChannel.close(); }
|