多路复用IO模型
思考集结处 2021-11-13 网络
有上上篇IO
模型中的,多路复用IO
模型,我们能够知道,应用程序线程调用select
直到接收到socket
可读,
等待内核准备好数据好发起系统调用。
# 实现
使用java
来实现多路复用IO
模型
# 客户端实现
/**
* @author:triumphxx
* @Date:2021/11/20
* @Time:10:31 上午
* @微信公众号:北漂码农有话说
* @网站:http://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc: 多路复用IO模型客户端
**/
public class MultiplexerClient {
public static void main(String[] args) throws IOException {
//1.创建SocketChannel
SocketChannel sc = SocketChannel.open();
//2.创建Selector
Selector sel = Selector.open();
try {
sc.configureBlocking(false);
//3.关联SocketChannel和Socket,socket绑定到本机端口
sc.socket().bind(new InetSocketAddress("localhost",9090));
//4.注册到Selector,感兴趣的事件为OP_CONNECT、OP_READ、OP_WRITE
sc.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
int i = 0;
boolean written = false, done = false;
ByteBuffer buffer = ByteBuffer.allocate(16);
while(!done) {
sel.select();
//5.从选择器中获取所有注册的通道信息(SelectionKey作为标识)
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
//6.获取通道,此处即为上边创建的channel
sc = (SocketChannel)key.channel();
//7.判断SelectorKey对应的channel发生的事件是否socket连接,并且还没有连接
if(key.isConnectable() && !sc.isConnected()) {
InetAddress addr = InetAddress.getByName(null);
//连接addr和port对应的服务器
boolean success = sc.connect(new InetSocketAddress(addr, 9090));
if(!success) {
sc.finishConnect();
}
}
//8.读与写是非阻塞的:客户端写一个信息到服务器,服务器发送一个信息到客户端,客户端再读
if(key.isReadable() && written) {
if(sc.read((ByteBuffer)buffer.clear()) > 0) {
written = false;
String response = buffer.flip().toString();
System.out.println(response);
if(response.indexOf("END") != -1) {
done = true;
}
}
}
if(key.isWritable() && !written) {
if(i < 10) {
sc.write(ByteBuffer.wrap(new String("howdy " + i + "\n").getBytes()));
}else if(i == 10){
sc.write(ByteBuffer.wrap("END".getBytes()));
}
written = true;
i++;
}
}
}
} finally {
sc.close();
sel.close();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
由代码可以知道:1、创建SocketChannel通道,注册到选择器,刚兴趣的事件为OP_CONNECT、OP_READ、OP_WRITE 2、客户端sel.select()不会阻塞,对注册通道不断的遍历,并且每次都可写。原因是OP_WRITE事件会持续生效,即只要连接存在就可以写,不管服务端是否有返回
# 服务端实现
/**
* @author:triumphxx
* @Date:2021/11/20
* @Time:10:31 上午
* @微信公众号:北漂码农有话说
* @网站:http://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc: 多路复用IO模型服务端
**/
public class MultiplexerServer {
public static void main(String[] args) throws IOException {
String encoding = System.getProperty("file.encoding");
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel ch = null;//Socket对应的channel
//1.创建ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
//2.创建选择器Selector
Selector sel = Selector.open();
try {
//3.设置ServerSocketChannel通道为非阻塞
ssc.configureBlocking(false);
//4.ServerSocketChannel关联Socket,用于监听连接,使用本地ip和port
//注意:Socket也对通道进行了改造,直接调Socket.getChannel()将返回bull,除非通过下边与通道关联
//the expression (ssc.socket().getChannel() != null) is true
ssc.socket().bind(new InetSocketAddress("localhost",9090));
//5.将通道注册到Selector,感兴趣的事件为 连接 事件
ssc.register(sel, SelectionKey.OP_ACCEPT);
while(true) {
//6.没有事件发生时,一直阻塞等待
sel.select();
//7.有事件发生时,获取Selector中所有SelectorKey(持有选择器与通道的关联关系)。
//由于基于操作系统的poll()方法,当有事件发生时,只返回事件个数,无法确定具体通道,故只能对所有注册的通道进行遍历。
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
//8.遍历所有SelectorKey,处理事件
while(it.hasNext()) {
SelectionKey sKey = it.next();
it.remove();//防止重复处理
//9.判断SelectorKey对应的channel发生的事件是否socket连接
if(sKey.isAcceptable()) {
//10.与ServerSocket.accept()方法相似,接收到该通道套接字的连接,返回SocketChannel,与客户端进行交互
ch = ssc.accept();
System.out.println(
"Accepted connection from:" + ch.socket());
//11.设置该SocketChannel为非阻塞模式
ch.configureBlocking(false);
//12.将该通道注册到Selector中,感兴趣的事件为OP_READ(读)
ch.register(sel, SelectionKey.OP_READ);
}else {
//13.发生非连接事件,此处为OP_READ事件。SelectorKey获取注册的SocketChannel,用于读写
ch = (SocketChannel)sKey.channel();
//14.将数据从channel读到ByteBuffer中
ch.read(buffer);
CharBuffer cb = (CharBuffer) buffer.flip();
String response = cb.toString();
System.out.print("Echoing : " + response);
//15.再将获取到的数据会写给客户端
ch.write((ByteBuffer)buffer.rewind());
if(response.indexOf("END") != -1) {
ch.close();
}
buffer.clear();
}
}
}
} finally {
if(ch != null) {
ch.close();
}
ssc.close();
sel.close();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
由代码可以知道:1。创建ServerSocketChannel和Selector,设置通道非阻塞,并与服务端的Socket绑定, 2、注册 ServerSocketChannel到Selector,感兴趣的事件为OP_CONNECT(获取连接) 3、select()方法阻塞等待,直到有事件发生 4、遍历Selector中的所有注册事件,通过SelectorKey维护Selector和Channel关联关系 5、如果是连接事件,则调ServerSocketChannel.accept()方法获取SocketChannel,与客户端交互 6、如果是读事件,则通过SelectorKey中获取SocketChannel,读写数据
# 优点及缺点
优点:绕过了I/O
在操作系统层面的accept()方法的阻塞问题
# 适用场景
IO
多路复用是专门用来解决IO
事件的,故而IO密
集型程序适合使用IO
多路复用
# IO多路复用方式
实现IO
多路复用有三种实现方式:select
、poll
、epoll
# Select优缺点
优点
- select的可移植性好,在某些unix下不支持poll。
- select对超时值提供了很好的精度,精确到微秒,而poll式毫秒。
缺点
- 单个进程可监视的fd数量被限制,默认是1024。
- 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
- 对fd进行扫描时是线性扫描,fd剧增后,IO效率降低,每次调用都对fd进行线性扫描遍历,随着fd的增加会造成遍历速度慢的问题。
- select函数超时参数在返回时也是未定义的,考虑到可移植性,每次超时之后进入下一个select之前都要重新设置超时参数
# poll的优缺点
优点
- 不要求计算最大文件描述符的大小。
- 应付大数量的文件描述符时比select要快。
- 没有最大连接数的限制是基于链表存储的。
缺点
- 大量的fd数组被整体复制于内核态和用户态之间,而不管这样的复制是不是有意义。
- 同select相同的是调用结束后需要轮询来获取就绪描述符。
# epoll的优点
- 支持一个进程打开大数目的socket描述符
- IO效率不随FD数目增加而线性下降
- 使用mmap加速内核与用户空间的消息传递
mmap 可以理解为用户态和内核态的一个缓存层
# 小结
以上就是java
的多路复用编程模型,以及多路复用实现的方式的简单介绍,很简单的一个小例子,希望对你有所帮助。代码传送门