内存与IO,磁盘IO,网络IO

文件描述符

1
2
常用软件:
yum install -y strace lsof pmap tcpdump
1
2
3
4
5
6
7
VFS:  虚拟文件系统 案例

[root@node01 ~]# df
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/sda3 202092480 10776508 181050220 6% /
tmpfs 1954400 0 1954400 0% /dev/shm
/dev/sda1 198337 27795 160302 15% /boot
1
2
3
4
5
df 
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/sda3 202092480 7187520 184639208 4% /
tmpfs 1954400 0 1954400 0% /dev/shm
/dev/sda1 198337 27795 160302 15% /boot
1
2
3
4
测试pipeline类型:
{ echo $BASHPID ; read x; } | { cat ; echo $BASHPID ; read y; }
测试socket类型:
exec 8<> /dev/tcp/www.baidu.com/80
1
2
3
4
5
6
7
8
9
10
11
#查看四元组信息
[root@localhost ~]# netstat -natp
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 1235/master
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 1008/sshd
tcp 0 0 192.168.163.129:22 192.168.163.1:2307 ESTABLISHED 1525/sshd: root@pts
tcp 0 0 192.168.163.129:37606 211.139.243.23:80 TIME_WAIT -
tcp 0 0 192.168.163.129:37602 211.139.243.23:80 TIME_WAIT -
tcp6 0 0 ::1:25 :::* LISTEN 1235/master
tcp6 0 0 :::22 :::* LISTEN 1008/sshd
1
2
3
4
5
6
7
8
9
10
#查看文件描述符、偏移量、端口等信息
[root@localhost ~]# lsof -op $$
COMMAND PID USER FD TYPE DEVICE OFFSET NODE NAME
bash 1529 root cwd DIR 253,0 100663361 /root
bash 1529 root rtd DIR 253,0 64 /
bash 1529 root txt REG 253,0 8388 /usr/bin/bash
bash 1529 root 0u CHR 136,0 0t0 3 /dev/pts/0
bash 1529 root 1u CHR 136,0 0t0 3 /dev/pts/0
bash 1529 root 2u CHR 136,0 0t0 3 /dev/pts/0
bash 1529 root 255u CHR 136,0 0t0 3 /dev/pts/0
1
2
3
4
通过读取fd6内容输入到a变量中,这时fd6的OFFSET会发生变化
read a <& 6

fd:文件描述符代表打开的文件,有inode号和seek偏移指针的概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
管道:
1,衔接,前一个命令的输出作为后一个命令的输入
2,管道会触发创建【子进程】,管道左右两边的命令都是当前/bin/bash的子进程

echo $$ | more -->输出当前bash的进程号
echo $BASHPID | more -->输出当前bash的子进程的进程号
原因:$$ 高于 |

使用linux的时候:
父进程的数据,子进程不可以看得到父进程的数据,也无法更改。

常规思想,进程是数据隔离的!

进阶思想,父进程其实可以让子进程看到数据! 通过【export】

linux中
export的环境变量,子进程的修改不会破坏父进程
父进程的修改也不会破坏子进程

Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等)数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程。
1
2
3
4
5
6
7
8
9
10
11
12
sysctl -a | grep dirty

vi /etc/sysctl.conf
#脏页的策略
#可用内存占用达到 ?%之后,才会将脏页持久化到磁盘
vm.dirty_background_ratio = 0
vm.dirty_background_bytes = 1048576
#可用内存占用达到 ?%之后,会阻塞住,将脏页持久化到磁盘
vm.dirty_ratio = 0
vm.dirty_bytes = 1048576
vm.dirty_writeback_centisecs = 5000
vm.dirty_expire_centisecs = 30000

虚拟内存和内存映射

system

system

不同的FD可以共享访问同一个pageCache,他们有各自的seek偏移量来读取pageCache的数据。

pageCache若被修改,会把该页标记成脏页,如果pageCache这时候没有flush到磁盘中,系统断电的话,会丢失未持久化到磁盘的脏页数据。

pageCache 会优化IO性能,但是会丢数据的风险。

system

磁盘IO

1
2
3
4
5
6
7
8
//最基本的file写,每次都要调用内核态写数据
public static void testBasicFileIO() throws Exception {
File file = new File(path);
FileOutputStream out = new FileOutputStream(file);
while(true){
out.write(data);
}
}

为什么 缓冲区 比 普通IO 快?
因为使用Buffer减少了syscall系统调用,它内部维护了一个8KB的缓冲区,当数据达到8KB才进行syscall,而不是每次write循环都触发syscall

1
2
3
4
5
6
7
8
9
10
//测试buffer文件IO,达到一个缓冲区大小时调用内核态
// jvm 8kB syscall write(8KBbyte[])

public static void testBufferedFileIO() throws Exception {
File file = new File(path);
BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file));
while(true){
out.write(data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//mmap通过映射进程和内核态的共享区域
public static void testRandomAccessFileWrite() throws Exception {

RandomAccessFile raf = new RandomAccessFile(path, "rw");

raf.write("hello mashibing\n".getBytes());
raf.write("hello seanzhou\n".getBytes());
System.out.println("write------------");
System.in.read();

//指针偏移量
raf.seek(4);
raf.write("ooxx".getBytes());

System.out.println("seek---------");
System.in.read();

FileChannel rafchannel = raf.getChannel();
//mmap 堆外 和文件映射的 byte not objtect
MappedByteBuffer map = rafchannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);


map.put("@@@".getBytes()); //不是系统调用 但是数据会到达 内核的pagecache
//曾经我们是需要out.write() 这样的系统调用,才能让程序的data 进入内核的pagecache
//曾经必须有用户态内核态切换
//mmap的内存映射,依然是内核的pagecache体系所约束的!!!
//换言之,丢数据
//你可以去github上找一些 其他C程序员写的jni扩展库,使用linux内核的Direct IO
//直接IO是忽略linux的pagecache
//是把pagecache 交给了程序自己开辟一个字节数组当作pagecache,动用代码逻辑来维护一致性/dirty。。。一系列复杂问题

System.out.println("map--put--------");
System.in.read();

//map.force(); // flush



raf.seek(0);

ByteBuffer buffer = ByteBuffer.allocate(8192);
//ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

int read = rafchannel.read(buffer); //buffer.put()
System.out.println(buffer);
buffer.flip();
System.out.println(buffer);

for (int i = 0; i < buffer.limit(); i++) {
Thread.sleep(200);
System.out.print(((char)buffer.get(i)));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//直接缓冲区
public void whatByteBuffer(){

// ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);


System.out.println("postition: " + buffer.position());
System.out.println("limit: " + buffer.limit());
System.out.println("capacity: " + buffer.capacity());
System.out.println("mark: " + buffer);

buffer.put("123".getBytes());

System.out.println("-------------put:123......");
System.out.println("mark: " + buffer);

buffer.flip(); //读写交替

System.out.println("-------------flip......");
System.out.println("mark: " + buffer);

buffer.get();

System.out.println("-------------get......");
System.out.println("mark: " + buffer);

buffer.compact();

System.out.println("-------------compact......");
System.out.println("mark: " + buffer);

buffer.clear();

System.out.println("-------------clear......");
System.out.println("mark: " + buffer);

}

system

网络IO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class SocketIOPropertites {

//server socket listen property:
//缓冲区大小
private static final int RECEIVE_BUFFER = 10;
//监听阻塞的超时时限
private static final int SO_TIMEOUT = 0;
private static final boolean REUSE_ADDR = false;
//内核中等待队列的大小(无关联进程的四元组数量)(等同于丢包)
private static final int BACK_LOG = 2;
//心跳,保持通讯
private static final boolean CLI_KEEPALIVE = false;
private static final boolean CLI_OOB = false;
private static final int CLI_REC_BUF = 20;
private static final boolean CLI_REUSE_ADDR = false;
private static final int CLI_SEND_BUF = 20;
private static final boolean CLI_LINGER = true;
private static final int CLI_LINGER_N = 0;
private static final int CLI_TIMEOUT = 0;
//TCP缓存优化发包,优化不需要每次小字节都发送过去,攒一批再发
//看情况而定,不然可能一次性冲击太大的数据包
private static final boolean CLI_NO_DELAY = false;
/*

StandardSocketOptions.TCP_NODELAY
StandardSocketOptions.SO_KEEPALIVE
StandardSocketOptions.SO_LINGER
StandardSocketOptions.SO_RCVBUF
StandardSocketOptions.SO_SNDBUF
StandardSocketOptions.SO_REUSEADDR

*/

public static void main(String[] args) {

ServerSocket server = null;
try {
server = new ServerSocket();
server.bind(new InetSocketAddress(9090), BACK_LOG);
server.setReceiveBufferSize(RECEIVE_BUFFER);
server.setReuseAddress(REUSE_ADDR);
server.setSoTimeout(SO_TIMEOUT);

} catch (IOException e) {
e.printStackTrace();
}
System.out.println("server up use 9090!");
try {
while (true) {

// System.in.read(); //分水岭:

Socket client = server.accept(); //阻塞的,没有 -1 一直卡着不动 accept(4,
System.out.println("client port: " + client.getPort());

client.setKeepAlive(CLI_KEEPALIVE);
client.setOOBInline(CLI_OOB);
client.setReceiveBufferSize(CLI_REC_BUF);
client.setReuseAddress(CLI_REUSE_ADDR);
client.setSendBufferSize(CLI_SEND_BUF);
client.setSoLinger(CLI_LINGER, CLI_LINGER_N);
client.setSoTimeout(CLI_TIMEOUT);
client.setTcpNoDelay(CLI_NO_DELAY);

//client.read //阻塞 没有 -1 0
new Thread(
() -> {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
char[] data = new char[1024];
while (true) {

int num = reader.read(data);

if (num > 0) {
System.out.println("client read some data is :" + num + " val :" + new String(data, 0, num));
} else if (num == 0) {
System.out.println("client readed nothing!");
continue;
} else {
System.out.println("client readed -1...");
System.in.read();
client.close();
break;
}
}

} catch (IOException e) {
e.printStackTrace();
}

}
).start();

}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SocketClient {

public static void main(String[] args) {

try {
Socket client = new Socket("192.168.150.11",9090);
client.setSendBufferSize(20);

client.setTcpNoDelay(false);
OutputStream out = client.getOutputStream();

InputStream in = System.in;
BufferedReader reader = new BufferedReader(new InputStreamReader(in));

while(true){
String line = reader.readLine();
if(line != null ){
byte[] bb = line.getBytes();
for (byte b : bb) {
out.write(b);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

system

system

system

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
TCP通信的几个维度:
win : 窗口大小
seq : 序列号
mss : 数据的真实大小(除去报文头)
Client -> Fin ->Server时,会带上自身的seq序列号
Server -> Fin + ACK -> Client时,会在这个对方序列号上 + 1,并带上自身的序列号

客户端建立TCP连接时会发送能接收的数据包大小(滑动窗口),如上图:【win 14600】
服务端回复ACK时会带上自身能够接收的数据包大小(MTU):【win 1448】

[root@localhost ~]# ifconfig
ens33: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500

通过ifconfig 能够查询到服务端一次最多能够接收的数据包大小 MTU 1500 bytes

拥塞控制:
服务端每次接收到数据包时服务端会返回还能接收的数据包大小。
如果客户端发送的数据包超过了服务端的能接收的大小,客户端需要阻塞等待服务端的处理。
服务端有空闲空间之后,服务端会补发一个消息到客户端上,通知可以继续发送了。
若客户端在没空闲空间的情况下,继续发送数据包,那么服务端将会把它丢弃,称之为丢包。

TCP的拥塞控制详解:慢开始、拥塞避免、快重传、快恢复

BIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SocketBIO {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(8090,20);

System.out.println("step1: new ServerSocket(8090) ");

while (true) {
Socket client = server.accept(); //阻塞1
System.out.println("step2:client\t" + client.getPort());

new Thread(new Runnable(){

public void run() {
InputStream in = null;
try {
in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while(true){
String dataline = reader.readLine(); //阻塞2

if(null != dataline){
System.out.println(dataline);
}else{
client.close();
break;
}
}
System.out.println("客户端断开");
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
1
2
3
追踪文件对内核的系统调用:strace -ff o out cmd

[root@localhost ~]# strace -ff o out /home/local/java/jdk1.8.0_191/bin/java SocketBIO

system

system

system

NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SocketNIO {

// what why how
public static void main(String[] args) throws Exception {

LinkedList<SocketChannel> clients = new LinkedList<>();

ServerSocketChannel ss = ServerSocketChannel.open(); //服务端开启监听:接受客户端
ss.bind(new InetSocketAddress(9090));
ss.configureBlocking(false); //重点 OS NONBLOCKING!!! //只让接受客户端 不阻塞

while (true) {
//接受客户端的连接
Thread.sleep(1000);
SocketChannel client = ss.accept(); //不会阻塞? -1 NULL
//accept 调用内核了:1,没有客户端连接进来,返回值?在BIO 的时候一直卡着,但是在NIO ,不卡着,返回-1,NULL
//如果来客户端的连接,accept 返回的是这个客户端的fd 5,client object
//NONBLOCKING 就是代码能往下走了,只不过有不同的情况

if (client == null) {
// System.out.println("null.....");
} else {
client.configureBlocking(false); //重点 socket(服务端的listen socket<连接请求三次握手后,往我这里扔,我去通过accept 得到 连接的socket>,连接socket<连接后的数据读写使用的> )
int port = client.socket().getPort();
System.out.println("client..port: " + port);
clients.add(client);
}

ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //可以在堆里 堆外

//遍历已经链接进来的客户端能不能读写数据
for (SocketChannel c : clients) { //串行化!!!! 多线程!!
int num = c.read(buffer); // >0 -1 0 //不会阻塞
if (num > 0) {
buffer.flip();
byte[] aaa = new byte[buffer.limit()];
buffer.get(aaa);

String b = new String(aaa);
System.out.println(c.socket().getPort() + " : " + b);
buffer.clear();
}
}
}
}
}

system

Linux内核函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//创建文件描述符
int socket(int domain, int type, int protocol);

//绑定四元组地址
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

//监听来自客户端的tcp socket的连接请求,backlog是等待队列数量
int listen(int sockfd, int backlog);

//和新的客户端建立连接
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

//读取客户端发送的数据
ssize_t recv(int sockfd, void *buf, size_t len, int flags);

//发送需要轮训的【fd列表、事件】、要监视的描述符的数目、超时配置(-1=阻塞)
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

//设置fd的相关内容、如读写、非阻塞
int fcntl(int fd, int cmd, ... /* arg */ );

//开辟一个大小为size的红黑树空间,返回epfd(类似于红黑树的根节点)
int epoll_create(int size);

//添加fd到红黑树 op操作如add/del, fd是需要添加进红黑树的文件描述符,epoll_event监听事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

//通过传递红黑树的epfd,拉取链表中已经就绪的fd
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

//EPOLL中文件描述符的状态
EPOLLIN 连接到达 有数据来临
The associated file is available for read(2) operations.
EPOLLOUT 有数据要写
The associated file is available for write(2) operations.

多路复用器

system

system

system

system

system

1
2
3
4
5
6
7
8
9
10
11
12
13
#LINUX中的JVM参数来控制使用EPOLL还是POLL,默认是选择性能最好的那个
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider

#追踪Linux内核调用
javac SocketMultiplexingSingleThreadv1.class
strace -ff -o out java SocketMultiplexingSingleThreadv1

#Linux模拟客户端创建连接
nc 192.168.163.1 9090

#Linux模拟服务端等待客户端连接
nc -l 192.168.163.1 9090
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
public class SocketMultiplexingSingleThreadv1 {

private ServerSocketChannel server = null;
private Selector selector = null; //linux 多路复用器(select poll epoll kqueue) nginx event{}
int port = 9090;

public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));


//如果在epoll模型下,open--》 epoll_create -> fd3
selector = Selector.open(); // select poll *epoll 优先选择:epoll 但是可以 -D修正

//server 约等于 listen状态的 fd4
/*
register
如果:
select,poll:jvm里开辟一个数组 fd4 放进去
epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN
*/
server.register(selector, SelectionKey.OP_ACCEPT);


} catch (IOException e) {
e.printStackTrace();
}
}

public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) { //死循环

Set<SelectionKey> keys = selector.keys();
System.out.println(keys.size()+" size");


//1,调用多路复用器(select,poll or epoll (epoll_wait))
/*
select()是啥意思:
1,select,poll 其实 内核的select(fd4) poll(fd4)
2,epoll: 其实 内核的 epoll_wait()
*, 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时
selector.wakeup() 结果返回0

懒加载:
其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用

*/
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys(); //返回的有状态的fd集合
Iterator<SelectionKey> iter = selectionKeys.iterator();
//so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!
// NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?
//我前边可以强调过,socket: listen 通信 R/W
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); //set 不移除会重复循环处理
if (key.isAcceptable()) {
//看代码的时候,这里是重点,如果要去接受一个新的连接
//语义上,accept接受连接且返回新连接的FD对吧?
//那新的FD怎么办?
//select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
//epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key); //连read 还有 write都处理了
//在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。
//所以,为什么提出了 IO THREADS
//redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
//tomcat 8,9 异步的处理方式 IO 和 处理上 解耦
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端 fd7
client.configureBlocking(false);

ByteBuffer buffer = ByteBuffer.allocate(8192); //前边讲过了

// 0.0 我类个去
//你看,调用了register
/*
select,poll:jvm里开辟一个数组 fd7 放进去
epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN
*/
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");

} catch (IOException e) {
e.printStackTrace();
}
}

public void readHandler(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();

}
}

public static void main(String[] args) {
SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
service.start();
}
}

底层实现

system

system

状态

system

四次分手:

1.服务端发起断开连接的FIN通知

2.客户端接收FIN通知并返回FIN_ACK确认

3.客户端也发送断开连接的FIN通知

4.服务端接受FIN通知并返回ACK确认

经过正常的四次分手后,客户端处于 closed 状态

由于是服务端先发起断开请求,所以服务端处于TIME_WAIT状态

system

如果服务端代码中没有写client.close()

1.客户端发起了断开连接的FIN通知

2.服务端接收到FIN通知并返回FIN_ACK确认

3.服务端自身标记成 close_wait 状态

4.客户端接收到FIN_ACK,标记自身状态为FIN_WAIT2等待服务端发送FIN通知

4.由于服务端没有编写client.close(),所以不会发送四次分手的二次确认

5.客户端没有收到客户端也想断开FIN的通知,依旧是 FIN_WAIT2 状态

单Selector单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public class SocketMultiplexingSingleThreadv1_1 {

private ServerSocketChannel server = null;
private Selector selector = null;
int port = 9090;

public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
selector = Selector.open(); // select poll *epoll
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}

public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) {
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key); //只处理了 read 并注册 关心这个key的write事件

} else if(key.isWritable()){
writeHandler(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void writeHandler(SelectionKey key) {

System.out.println("write handler...");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.flip();
while (buffer.hasRemaining()) {
try {

client.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer.clear();
key.cancel();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}


}

public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(8192);
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");

} catch (IOException e) {
e.printStackTrace();
}
}

public void readHandler(SelectionKey key) {

System.out.println("read handler.....");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
//处理读事件中注册写事件
if (read > 0) {
client.register(key.selector(),SelectionKey.OP_WRITE,buffer);
//关心 OP_WRITE 其实就是关系send-queue是不是有空间
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
SocketMultiplexingSingleThreadv1_1 service = new SocketMultiplexingSingleThreadv1_1();
service.start();
}
}

在上面单线程的Selector中,先accept、read、write,都处于一个单线程中,没有什么问题,但是会造成CPU资源无法充分利用,若有其中一个Client处理了很长时间,会导致事件的堆积。

单Selector多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public class SocketMultiplexingSingleThreadv2 {

private ServerSocketChannel server = null;
private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
int port = 9090;

public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
selector = Selector.open(); // select poll *epoll
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}

public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) {
while (selector.select(50) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
// key.cancel(); //现在多路复用器里把key cancel了
System.out.println("in.....");
key.interestOps(key.interestOps() | ~SelectionKey.OP_READ);
readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发

} else if(key.isWritable()){
//写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
//你真的要明白:什么时候写?不是依赖send-queue是不是有空间
//1,你准备好要写什么了,这是第一步
//2,第二步你才关心send-queue是否有空间
//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
//4,如果一开始就注册了write的事件,会进入死循环,一直调起!!!
// key.cancel();
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
writeHandler(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void writeHandler(SelectionKey key) {
new Thread(()->{
System.out.println("write handler...");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.flip();
while (buffer.hasRemaining()) {
try {

client.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer.clear();
}).start();

}

public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(8192);
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");

} catch (IOException e) {
e.printStackTrace();
}
}

public void readHandler(SelectionKey key) {
new Thread(()->{
System.out.println("read handler.....");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
System.out.println(Thread.currentThread().getName()+ " " + read);
if (read > 0) {
key.interestOps( SelectionKey.OP_READ);

client.register(key.selector(),SelectionKey.OP_WRITE,buffer);
} else if (read == 0) {

break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();

}

public static void main(String[] args) {
SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();
service.start();
}
}

system

system

多Selector单线程

1
2
3
4
5
V1版本:混合模式,只有一个线程负责accept,每个都会被分配client,进行R/W

V2版本:index=[0]的Selector只注册ACCEPT事件,其他Selector注册R/W事件

V3版本:创建Boss线程组,多个Selector负责ACCEPT事件,创建Wroker线程组,多个Select负责R/W事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MainSocketServer {
public static void main(String[] args) throws IOException {
//V3启动代码
//创建Boss线程组,线程数量num=3
SelectorThreadGroup bossGroup = new SelectorThreadGroup(3);
//创建Worker线程组,线程数量num=3
SelectorThreadGroup workerGroup = new SelectorThreadGroup(3);
//启动线程
bossGroup.start();
workerGroup.start();
//绑定Boss和Worker关系
bossGroup.setWorker(workerGroup);
//Boss线程组注册ServerSocker
bossGroup.bind(7777, 8888, 9999);

//V1、V2的启动代码
//SelectorThreadGroup stg = new SelectorThreadGroup(3);
//stg.start();
//stg.bind(7777, 8888, 9999);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// 每线程对应一个selector,
// 多线程情况下,该主机,该程序的并发客户端被分配到多个selector上
//注意,每个客户端,只绑定到其中一个selector
//不会有交互问题
public class SelectorThread implements Runnable {

SelectorThreadGroup stg;

Selector selector;

//线程和线程组之间的通信队列
LinkedBlockingDeque<Channel> queue = new LinkedBlockingDeque<>();

public SelectorThread(SelectorThreadGroup stg) throws IOException {
this.stg = stg;
this.selector = Selector.open();
}

@Override
public void run() {
try {
while (true) {
//1.阻塞 无事件注册时依靠wakeup()唤醒
int num = selector.select();
//2.处理selectkeys
if (num > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//每个Selector内部串行处理事件,多线程Selector各自处理不同的事件
//多个Selector的事件互不影响,无需调用key.cancel
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();
if (next.isAcceptable()) {
acceptHandler(next);
} else if (next.isReadable()) {
readHandler(next);
}
}
}

//3,处理一些task : listen client 队列中有新的需要注册到Selector的事件
while (!queue.isEmpty()) {
Channel channel = queue.take();
registerChannel(channel);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

//注册Channel
public void registerChannel(Channel channel) throws IOException {
if (channel instanceof ServerSocketChannel) {
ServerSocketChannel server = (ServerSocketChannel) channel;
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println(Thread.currentThread().getName() + " " + server.getLocalAddress() + " ServerSocket has register to Selector...");
} else {
SocketChannel client = (SocketChannel) channel;
client.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
client.register(selector, SelectionKey.OP_READ, byteBuffer);
System.out.println(Thread.currentThread().getName() + " " + client.getRemoteAddress() + " Socket has register to Selector...");
}

}

public void acceptHandler(SelectionKey key) throws IOException {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
System.out.println(Thread.currentThread().getName() + " " + channel.getLocalAddress() + " accept listen...");

SocketChannel client = channel.accept();

//根据线程组的规则挑选一个Selector进行事件注册
stg.registerV3(client);
//stg.registerV2(client);
//stg.registerV1(client);
}

public void readHandler(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
System.out.println(Thread.currentThread().getName() + " register read..." + channel.getRemoteAddress());
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
byteBuffer.clear();
while (true) {
int read = channel.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
channel.write(byteBuffer);
}
byteBuffer.clear();
} else if (read == 0) {
break;
} else {
key.channel();
System.out.println("断开连接:" + channel.getRemoteAddress());
break;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class SelectorThreadGroup {

SelectorThread[] sts;

//用于计算数组下标
AtomicInteger count = new AtomicInteger();

//V3版本专用:默认启动类都是Boss组,需要存储Worker组的信息
SelectorThreadGroup worker = this;


public SelectorThreadGroup(int num) throws IOException {
//num 线程数
sts = new SelectorThread[num];
for (int i = 0; i < num; i++) {
sts[i] = new SelectorThread(this);
}
}

//启动线程
public void start() {
for (int i = 0; i < sts.length; i++) {
new Thread(sts[i]).start();
}
}

public void setWorker(SelectorThreadGroup worker) {
this.worker = worker;
}

//创建ServerSocker的端口监听
public void bind(int... ports) throws IOException {
for (int port : ports) {
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(port));
server.configureBlocking(false);
registerV3(server);
//registerV2(server);
//registerV1(server);
}
System.out.println("服务器启动了......");
}


//V3版本:Boss线程组的Selector进行注册accept事件,Worker线程组注册R/W事件
public void registerV3(Channel channel) {
SelectorThread st = null;
if (channel instanceof ServerSocketChannel) {
//注册ACCEPT到BOSS组,从STG对象中挑选Selector(复用V1版本的方法)
st = this.nextSelectorV1();
} else {
//注册R/W到Worker组,从workers对象中挑选Selector(复用V1版本的方法)
st = worker.nextSelectorV1();
}
//1,通过队列传递数据 消息
st.queue.add(channel);
//2,通过打断阻塞,让对应的线程去自己在打断后完成注册selector
st.selector.wakeup();
}

//V2版本:只挑第一个Selector进行注册accept事件
public void registerV2(Channel channel) {
SelectorThread st = null;
if (channel instanceof ServerSocketChannel) {
st = sts[0];
} else {
st = nextSelectorV2();
}
st.queue.add(channel);
st.selector.wakeup();

}

//V2版本:返回sts数组[1 - num]下标的其中一个Selector
public SelectorThread nextSelectorV2() {
int index = count.getAndIncrement() % (sts.length - 1);
return sts[index + 1];
}

//V1版本:根据轮训规则随便挑一个Selector进行注册
public void registerV1(Channel channel) {
SelectorThread st = nextSelectorV1();
st.queue.add(channel);
st.selector.wakeup();
}

//V1版本:轮训返回sts数组中的其中一个Selector
public SelectorThread nextSelectorV1() {
int index = count.getAndIncrement() % sts.length;
return sts[index];
}
}

Netty响应式编程

缓冲池概念

在对象引用的实现中,每当一个Buffer实例没有被引用时,则会销毁该对象实例,如被GC回收,但是Buffer对象创建时的内存分配开销是比较大的,如果频繁创建Buffer对象,频繁进行内存分配释放,则开销较大,影响性能,故在netty4中新增了对象池化机制,即Buffer对象没有被引用时,可以放到一个对象缓存池中,而不是马上销毁,当需要时,则重新从对象缓存池中取出,而不需要重新创建。

PooledByteBuf

继承于AbstractReferenceCountedByteBuf,在引用计数的基础上,添加池化机制减少对象创建,内存分配释放,提高性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 泛型T控制底层底层存放数据的实现
// 如字节数组byte[],Java NIO的ByteBuffer
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
// 核心字段recyclerHandle
// 每个对象实例包含一个recyclerHandle字段,
// 该字段作为中介,将该对象实例放到该对象实例所在的类的对象池(类级别字段)中
// Handle类的value字段指向该对象
private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;

protected PoolChunk<T> chunk;
protected long handle;
protected T memory;
protected int offset;
protected int length;
int maxLength;
PoolThreadCache cache;
private ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;

// 中间省略其他方法
...

// 该对象引用计数为0时,释放该对象
// 调用recycle方法,将该对象实例放到其所在类的对象缓存池中
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}
}

private void recycle() {
recyclerHandle.recycle(this);
}
}

PooledDirectByteBuf

直接内存的池化实现类

RECYCLER为PooledDirectByteBuf类的对象实例缓存池;

newInstance方法,从缓存池RECYCLER获取一个DirectByteBuf的对象实例,然后调用reuse重置该buf,然后返回给调用方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};

static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}

...

}

PooledDirectByteBuf的reuse实现:
/**
* Method must be called before reuse this {@link PooledByteBufAllocator}
*/
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}

PooledHeapByteBuf

堆内存的池化实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {

private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};

static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}

...

}

ByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Test
public void myBytebuf(){

//基于直接内存的ByteBuf 初始容量 8byte,最大容量20byte
//ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(8, 20);

//不基于pool 池化的堆内存ByteBuf
//ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);
//池化的 堆内存 ByteBuf
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);
print(buf);

buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);
buf.writeBytes(new byte[]{1,2,3,4});
print(buf);

}

public static void print(ByteBuf buf){
//是否可读状态
System.out.println("buf.isReadable() :"+buf.isReadable());
//读的起始下标
System.out.println("buf.readerIndex() :"+buf.readerIndex());
//可读的字节长度
System.out.println("buf.readableBytes() "+buf.readableBytes());
//是否可写状态
System.out.println("buf.isWritable() :"+buf.isWritable());
//写的起始下标
System.out.println("buf.writerIndex() :"+buf.writerIndex());
//可写的字节长度
System.out.println("buf.writableBytes() :"+buf.writableBytes());
//当前ByteBuf容量
System.out.println("buf.capacity() :"+buf.capacity());
//ByteBuf最大容量
System.out.println("buf.maxCapacity() :"+buf.maxCapacity());
//是否直接内存
System.out.println("buf.isDirect() :"+buf.isDirect());
System.out.println("--------------");
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void clientMode() throws Exception {
//类似于管理Selector的线程组
NioEventLoopGroup thread = new NioEventLoopGroup(1);

//客户端模式:Netty提供的NioSocketChannl
NioSocketChannel client = new NioSocketChannel();

//将client注册到Selector中,等同于 epoll_ctl(5,ADD,3)
thread.register(client);

//响应式编程:创建一个管道,添加一个处理类,接收到消息时响应调用处理
ChannelPipeline p = client.pipeline();
p.addLast(new MyInHandler());

//reactor 异步的特征 异步注册监听事件
ChannelFuture connect = client.connect(new InetSocketAddress("192.168.150.11", 9090));
//sync进行同步阻塞,等待注册完成
ChannelFuture sync = connect.sync();

//创建拷贝的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());
//发送消息并且刷新(异步操作)
ChannelFuture send = client.writeAndFlush(buf);
//阻塞等待发送成功
send.sync();
//关闭通道并阻塞等待关闭成功
sync.channel().closeFuture().sync();

System.out.println("client over....");

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//该注解用于多个Client响应时共享Handler对象,否则一个Handler只能给一个Client使用
@ChannelHandler.Sharable
class MyInHandler extends ChannelInboundHandlerAdapter {

//响应式注册
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("client registed...");
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client active...");
}

//响应式读取,将接受到的内容写回给Server端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//msg = client.read() 默认调用得到的,可以直接强转
ByteBuf buf = (ByteBuf) msg;
//这里不能直接用 buf.readCharSequence 读成字符串,因为读出来后,buffer内容将被清空
//CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
//调用getCharSequence读取Buffer内容,但是内容不会被清空
CharSequence str = buf.getCharSequence(0,buf.readableBytes(), CharsetUtil.UTF_8);
//输出内容
System.out.println(str);
//回写有内容的Buffer给Server
ctx.writeAndFlush(buf);
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void serverMode() throws Exception {
//类似于管理Selector的线程组
NioEventLoopGroup thread = new NioEventLoopGroup(1);
//服务端模式:NioServerSocketChannel
NioServerSocketChannel server = new NioServerSocketChannel();

//将client注册到Selector中,等同于 epoll_ctl(5,ADD,3)
thread.register(server);

//响应式编程:创建一个管道,添加一个处理类,接收到消息时响应调用处理
ChannelPipeline p = server.pipeline();

//添加响应式处理事件
//accept接收客户端,并且注册到selector(传参Selector线程组、Client的READ响应事件)
p.addLast(new MyAcceptHandler(thread,new MyInHandler()));
//优化的方案,添加一层
p.addLast(new MyAcceptHandler(thread,new ChannelInit()));


//异步注册监听事件
ChannelFuture bind = server.bind(new InetSocketAddress("192.168.150.1", 9090));
//bind.sync() 阻塞 并 阻塞等待Server发送关闭请求(理论上永远不会发生,主要是为了让Main线程进入Wait状态,子进程进行Netty事件监听工作,而不结束程序)
bind.sync().channel().closeFuture().sync();
System.out.println("server close....");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class  MyAcceptHandler  extends ChannelInboundHandlerAdapter{

private final EventLoopGroup selector;
private final ChannelHandler handler;

public MyAcceptHandler(EventLoopGroup thread, ChannelHandler myInHandler) {
this.selector = thread;
this.handler = myInHandler; //ChannelInit
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("server registerd...");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// listen socket accept client
// socket R/W
//accept 我怎么没调用额?
//Netty 自动调用了
SocketChannel client = (SocketChannel) msg;
//1.响应式的 handler
//下文案例优化的相应注释:client::pipeline[ChannelInit]
ChannelPipeline p = client.pipeline();
p.addLast(handler);

//2.注册
selector.register(client);
}
}

优化

由于使用注解@ChannelHandler.Sharable造成所有客户端都使用了同一个Handler对象,如果每个客户端都想要独立的Handler对象呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//为啥要有一个inithandler,可以没有,但是MyInHandler就得设计成单例
//思想:外层包一个Handler,复写它的Register方法,当注册的时候,在Register中创建各自的Handler
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Channel client = ctx.channel();
ChannelPipeline p = client.pipeline();
//在原有的基础Handler上再添加新的Handler
p.addLast(new MyInHandler()); //2.client::pipeline[ChannelInit,MyInHandler]
//然后移除自身的Handler
ctx.pipeline().remove(this); //3.client::pipeline[MyInHandler]
}
}

Netty客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void nettyClient() throws InterruptedException, IOException {
NioEventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyHandler());
}
})
.connect(new InetSocketAddress("192.168.1.134", 9090));
connect.sync();

NioSocketChannel client = (NioSocketChannel) connect.channel();
ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());
ChannelFuture channelFuture = client.writeAndFlush(buf);
channelFuture.sync();
}

Netty服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void nettyServer() throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
//Server需要绑定Boss和Worker线程组,这里偷懒复用同一个
ChannelFuture bind = bootstrap.group(group, group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MyHandler());
}
})
.bind(new InetSocketAddress("192.168.3.32", 9090));
bind.sync().channel().closeFuture().sync();
}

手写RPC框架

system

1
2
3
4
//动态代理调用的接口
interface Say {
String saySomething(String hello);
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//消费者的启动函数
public class MainConsumer {
public static void main(String[] args) throws InterruptedException {
//如果这里包含了Provider的服务,则将走本地调用

for (int i = 0; i < 20; i++) {
String Isay = "~~~hello~~~~" + i;
new Thread(() -> {
Say say = ProxyUtils.proxy(Say.class);
String recv = say.saySomething(Isay);
System.out.println("I say : 【" + Isay + "】 and provider return :" + recv);
}).start();
}
}
}

动态代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//利用JDK的动态代理
public class ProxyUtils {
public static <T> T proxy(Class<T> clazz) {
ClassLoader classLoader = clazz.getClassLoader();
Class<?>[] classes = {clazz};
return (T) Proxy.newProxyInstance(classLoader, classes, (proxy, method, args) -> {
//如何设计我们的consumer对于provider的调用过程
//调用 服务,方法,参数 ==》 封装成message [content]
Class<?>[] parameterTypes = method.getParameterTypes();
String methodName = method.getName();
String name = clazz.getName();

Object object = Dispatcher.get(clazz.getName());
//从本地对象中获取,判断是否是本地调用,若是本地调用,则直接反射方法,若不是,则走RPC
if (object != null) {
System.out.println("走本地调用:FC...");
Method m = clazz.getMethod(methodName, parameterTypes);
return m.invoke(object, args);
}
System.out.println("走RPC调用:RPC...");
//消息体
MsgBody msgBody = new MsgBody(name, methodName, args, parameterTypes);
//调用Socket发送请求
SynchronousQueue<Object> blockQueue = ClientFactory.getFactory().transport(msgBody);
//如果从IO ,未来回来了,怎么将代码执行到这里
return blockQueue.take();
});
}
}

协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MsgHeader implements Serializable {
//拆包的协议
public int protocol;
//唯一ID
public String uuid;
//消息体长度
public long dataLen;

//Consumer发给Provider的协议
public static final int tranProtocol = 0x1010;
//Provider发给Consumer的协议
public static final int recvProtocol = 0x0101;

public MsgHeader(int protocol, String uuid, long dataLen) {
this.protocol = protocol;
this.uuid = uuid;
this.dataLen = dataLen;
}

//省略get、set、toString方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MsgBody implements Serializable {
//RPC远程调用接口名称
String name;
//RPC远程调用方法名
String method;
//参数
Object[] args;
//方法类型
Class<?>[] parameterTypes;

public MsgBody(String name, String method, Object[] args, Class<?>[] parameterTypes) {
this.name = name;
this.method = method;
this.args = args;
this.parameterTypes = parameterTypes;
}

//省略get、set、toString方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//序列化对象的工具类
public class SerializableUtils {
static ByteArrayOutputStream baos = new ByteArrayOutputStream();

public synchronized static byte[] toBytes(Object object) throws IOException {
baos.reset();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(object);
return baos.toByteArray();
}

public static Object toObject(byte[] bytes) throws IOException, ClassNotFoundException {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
return ois.readObject();
}
}

链接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//链接池工厂
public class ClientFactory {

//访问同一个Server最多能创建的Client数量
int size = 10;
private static final ClientFactory factory = new ClientFactory();

ConcurrentHashMap<InetSocketAddress, ClientPool> pools = new ConcurrentHashMap<>();

public static ClientFactory getFactory() {
return factory;
}

private ClientFactory() {
}

//获得一个NioSocketChannel链接
public NioSocketChannel getClientChannel(InetSocketAddress server) {
ClientPool pool = pools.get(server);
//懒加载
if (pool == null) {
pools.putIfAbsent(server, new ClientPool(size));
pool = pools.get(server);
}
try {
return pool.getClient(server);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

public SynchronousQueue<Object> transport(MsgBody msgBody) throws Exception {
//requestID+message ,本地要缓存
SynchronousQueue<Object> blockQueue = new SynchronousQueue<>();
String uuid = UUID.randomUUID().toString();
CallBackUtils.add(uuid, blockQueue);

//协议:【header<>】【msgBody】

byte[] bodyBytes = SerializableUtils.toBytes(msgBody);
MsgHeader header = new MsgHeader(MsgHeader.tranProtocol, uuid, bodyBytes.length);
byte[] headerBytes = SerializableUtils.toBytes(header);

//通过打印headerBytes查看协议头有多少字节
//System.out.println("headerBytes : " + headerBytes.length);

ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(headerBytes.length + bodyBytes.length);
byteBuf.writeBytes(headerBytes);
byteBuf.writeBytes(bodyBytes);

//连接池::取得连接
//获取连接过程中: 开始-创建,过程-直接取
InetSocketAddress server = new InetSocketAddress("localhost", 9090);
NioSocketChannel clientChannel = getClientChannel(server);

//5,发送--> 走IO out -->走Netty(event 驱动)
ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
channelFuture.sync();

return blockQueue;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ClientPool {

//采用最简单的轮训算法
Random random = new Random();

//缓存链接的数组
NioSocketChannel[] clients;

//懒加载client
public ClientPool(int num) {

clients = new NioSocketChannel[num];
}

//这里偷懒全局加锁
public synchronized NioSocketChannel getClient(InetSocketAddress server) throws InterruptedException {
int index = random.nextInt(clients.length);
NioSocketChannel client = clients[index];
//如果client存在则返回--》需要注意的是,它是允许多个线程持有同一个client
if (client != null && client.isActive()) {
return client;
}
//如果client不存在则创建,并返回
clients[index] = create(server);
return clients[index];
}

public NioSocketChannel create(InetSocketAddress server) throws InterruptedException {
//创建只有一个EventLoop的工作组
NioEventLoopGroup worker = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//这里用了拆包协议,下文会详细讲解
pipeline.addLast(new ProtocolDecoder());
//具体处理回调的Handler
pipeline.addLast(new ConsumerHandler());
}
})
.connect(server);
connect.sync();
return (NioSocketChannel) connect.channel();
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//Provider启动类
public class MainProvider {
public static void main(String[] args) throws InterruptedException {

//1个Selector.ACCPET的EventLoop线程组
NioEventLoopGroup boss = new NioEventLoopGroup(1);
//20个Selector处理R/W的工作组
NioEventLoopGroup worker = new NioEventLoopGroup(20);
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture bind = bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("server accept client port: " + ch.remoteAddress().getPort());

ChannelPipeline pipeline = ch.pipeline();
//1.原始的思想
//pipeline.addList(new ProviderRequestHandler());

//2.进阶的思想
pipeline.addLast(new ProtocolDecoder());
pipeline.addLast(new ProviderHandler());
}
})
.bind(new InetSocketAddress("localhost", 9090));

bind.sync().channel().closeFuture().sync();

}
}

实现类

1
2
3
4
5
6
7
public class MySay implements Say {

@Override
public String saySomething(String hello) {
return "I say Hello back";
}
}
1
2
3
4
5
Provider原始思想的Handler:
1.读取到ByteBuf后,根据消息头的大小进行数据分割,由于前面预测试输出过一个MsgHttper对象的byte数组大小为143
2.所以读取的前143个字节的数据转换成MsgHeader对象
3.然后根据MsgHeader的属性dataLen获取到消息体大小
4.然后读取dataLen个字节的数据转换成MsgBody对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//原始的思想的Handler
public class ProviderRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object buf) throws Exception {
ByteBuf byteBuf = (ByteBuf) buf;

if (byteBuf.readableBytes() > 143) {
byte[] headBytes = new byte[143];
byteBuf.readBytes(headBytes, byteBuf.readerIndex(), headBytes.length);
MsgHeader header = (MsgHeader) SerializableUtils.toObject(headBytes);
System.out.println(header);
if (byteBuf.readableBytes() >= header.dataLen) {
byte[] bodyBytes = new byte[(int) header.dataLen];
byteBuf.readBytes(bodyBytes);
MsgBody msgBody = (MsgBody) SerializableUtils.toObject(bodyBytes);
System.out.println(msgBody);
}
}
}
}

思考:这种设计在并发的情况下会不会产生问题?一个ByteBuf对象包含一个完整的数据包吗?

解码器

system

system

根据上文的思想,需要实现拆包和缓存不完整的数据包和下次数据包进行拼接,而Netty框架肯定也考虑了这个问题,对种现象进行了封装,称做 解码器它的作用是实现数据包的拆包操作,并将拆好的数据包添加进一个List<Object> obj中,它会一个一个传递给下一个Handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 用于对Byte数据按照不同的协议进行解码
*/
public class ProtocolDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
ByteBuf byteBuf = (ByteBuf) buf;

//因为一个buffer中可能有多个数据包,使用while循环读取多次
while (byteBuf.readableBytes() > 143) {
byte[] headBytes = new byte[143];
//使用getBytes先获取头信息,readIndex指针不移动
byteBuf.getBytes(byteBuf.readerIndex(), headBytes);
MsgHeader header = (MsgHeader) SerializableUtils.toObject(headBytes);

//数据拆包,如果可读数据包含一个完整的msgBody大小时,则进行读取,否则跳出当前循环等待拼接下次数据包
if (byteBuf.readableBytes() - headBytes.length >= header.dataLen) {
//由于之前读取没有移动readIndex,所以现在让它移动指针
byteBuf.readBytes(headBytes.length);

//把数据先读到byte数组里,再判断不同协议转换成不同的对象进行拆包
byte[] msgBytes = new byte[(int) header.dataLen];
byteBuf.readBytes(msgBytes);

//如果是Client发送给Server的拆包操作,以tranPrototol协议来拆包
if (header.protocol == MsgHeader.tranProtocol) {

MsgBody msgBody = (MsgBody) SerializableUtils.toObject(msgBytes);
//将数据拆成Header和MsgBody,添加进out,传递给下一个Handler
out.add(new ProtocolMessage(header, msgBody));
} else if (header.protocol == MsgHeader.recvProtocol) {
//如果是Client接受Server返回的拆包操作,以recvProtocol协议拆包
Object result = SerializableUtils.toObject(msgBytes);
//将数据拆成Header和result,添加进out,传递给下一个Handler
out.add(new ProtocolMessage(header, result));
}
} else {
break;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ProtocolMessage {
MsgHeader header;
MsgBody msgBody;
Object result;

//Consumer发送的协议封包
public ProtocolMessage(MsgHeader header, MsgBody msgBody) {
this.header = header;
this.msgBody = msgBody;
}
//Provider发送的协议封包
public ProtocolMessage(MsgHeader header, Object result) {
this.header = header;
this.result = result;
}
}
1
2
3
4
这里做具体ChannelRead的IO逻辑: 如果假设处理完了,要给客户端返回了~!!!需要注意哪些环节~?
1.因为是个RPC,得返回Header带过来的uuid!!!!
2.关注RPC通信协议 protocol,在client那一侧也要解决解码问题
3.业务数据处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
业务处理的几种策略:
1.直接在当前方法处理IO 和 业务 和 返回
弊端:processSelectorKeys和runTask(业务处理)捆绑,其他Channel需要等待它的处理

2.自己创建线程池
需要注意:当前线程和处理业务的线程肯定不是同一个线程

3.业务逻辑传递给其他EventLoop,每一个EventLoop都有一个单线程、Selector、还有一个task队列
编码:ctx.executor().parent().next().execute()
好处:将processSelectorKeys和runAllTasks解耦,可以将runAllTasks的压力分散到其他Selector中
需要注意:当前线程和处理业务线程可能不是同一个线程,处理业务线程可能是其他EventLoop的单线程

4.使用netty自己的eventloop来处理业务及返回
编码:ctx.executor().execute();
作用:将所有processSelectorKeys先处理完,然后再开始按顺序执行runAllTasks
需要注意:当前线程和处理业务的线程肯定是同一个线程,都是由用一个EventLoop的单线程处理

system

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ProviderHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object out) throws Exception {
//ByteToMessageDecoder的拆包会拆好的数据包一个一个传递给下一个handler
ProtocolMessage pmsg = (ProtocolMessage) out;

//当前线程
String ioThreadName = Thread.currentThread().getName();

//反射执行目标方法
ctx.executor().parent().next().execute(() -> {
String taskThreadName = Thread.currentThread().getName();
String param = (String) pmsg.msgBody.args[0];
//String result = ioThreadName + " recv say : 【" + param + "】 and send by " + taskThreadName;

Object result = null;
try {
//反射调用API
Object object = Dispatcher.get(pmsg.msgBody.name);
Class<?> clazz = object.getClass();
Method m = clazz.getMethod(pmsg.msgBody.method, pmsg.msgBody.parameterTypes);
result = m.invoke(object, pmsg.msgBody.args);

//写结果集
byte[] resultBytes = SerializableUtils.toBytes(result);
MsgHeader header = new MsgHeader(MsgHeader.recvProtocol, pmsg.header.uuid, resultBytes.length);

byte[] headerBytes = SerializableUtils.toBytes(header);
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(headerBytes.length + resultBytes.length);
byteBuf.writeBytes(headerBytes);
byteBuf.writeBytes(resultBytes);
ctx.writeAndFlush(byteBuf);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
//Provider发送完消息后回过头看Consumer的接收回复消息的Handler
//Consumer也用了拆包,所以这里是一个完整的数据包,回调CallBck唤醒阻塞的线程
public class ConsumerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object out) throws Exception {
//1.解析数据转换成对象
ProtocolMessage pmsg = (ProtocolMessage) out;
//2.获取uuid回调callback
CallBackUtils.callback(pmsg.header.uuid, pmsg.result);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CallBackUtils {
static ConcurrentHashMap<String, SynchronousQueue<Object>> map = new ConcurrentHashMap<>();

public static void add(String uuid, SynchronousQueue queue) {
map.put(uuid, queue);
}

//往队列里塞返回值,使得Consumer的调用线程不再阻塞
public static void callback(String uuid, Object result) throws InterruptedException {
SynchronousQueue queue = map.get(uuid);
queue.put(result);
//移除掉已经使用的内容,否则map会很大
map.remove(uuid);
}
}

system

system

总结

system

system

system

system

最后更新: 2021年02月07日 16:43

原始链接: https://midkuro.gitee.io/2020/10/29/io-base/

× 请我吃糖~
打赏二维码