思考集结处

vuePress-theme-reco 思考集结处    2024
思考集结处 思考集结处

Choose mode

  • dark
  • auto
  • light
首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
author-avatar

思考集结处

43

文章

18

标签

首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
  • Java相关技术
  • 网络编程相关概念
  • 同步阻塞IO模型
  • 同步非阻塞IO模型
  • 多路复用IO模型
  • 异步IO模型
  • 多线程源码分析
  • 线程池
  • Spring 事件监听
  • Spring 重试机制
  • Spring 重试机制之listeners参数

多路复用IO模型

vuePress-theme-reco 思考集结处    2024

多路复用IO模型

思考集结处 2021-11-13 网络

有上上篇IO模型中的,多路复用IO模型,我们能够知道,应用程序线程调用select直到接收到socket可读, 等待内核准备好数据好发起系统调用。

# 实现

使用java来实现多路复用IO模型

# 客户端实现

/**
 * @author:triumphxx
 * @Date:2021/11/20
 * @Time:10:31 上午
 * @微信公众号:北漂码农有话说
 * @网站:http://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: 多路复用IO模型客户端
 **/
public class MultiplexerClient {

    public static void main(String[] args) throws IOException {
        //1.创建SocketChannel
        SocketChannel sc = SocketChannel.open();
        //2.创建Selector
        Selector sel = Selector.open();
        try {
            sc.configureBlocking(false);
            //3.关联SocketChannel和Socket,socket绑定到本机端口
            sc.socket().bind(new InetSocketAddress("localhost",9090));
            //4.注册到Selector,感兴趣的事件为OP_CONNECT、OP_READ、OP_WRITE
            sc.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            int i = 0;
            boolean written = false, done = false;

            ByteBuffer buffer = ByteBuffer.allocate(16);
            while(!done) {
                sel.select();
                //5.从选择器中获取所有注册的通道信息(SelectionKey作为标识)
                Iterator<SelectionKey> it = sel.selectedKeys().iterator();
                while(it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    //6.获取通道,此处即为上边创建的channel
                    sc = (SocketChannel)key.channel();
                    //7.判断SelectorKey对应的channel发生的事件是否socket连接,并且还没有连接
                    if(key.isConnectable() && !sc.isConnected()) {
                        InetAddress addr = InetAddress.getByName(null);
                        //连接addr和port对应的服务器
                        boolean success = sc.connect(new InetSocketAddress(addr, 9090));
                        if(!success) {
                            sc.finishConnect();
                        }
                    }
                    //8.读与写是非阻塞的:客户端写一个信息到服务器,服务器发送一个信息到客户端,客户端再读
                    if(key.isReadable() && written) {
                        if(sc.read((ByteBuffer)buffer.clear()) > 0) {
                            written = false;
                            String response = buffer.flip().toString();
                            System.out.println(response);
                            if(response.indexOf("END") != -1) {
                                done = true;
                            }
                        }
                    }
                    if(key.isWritable() && !written) {
                        if(i < 10) {
                            sc.write(ByteBuffer.wrap(new String("howdy " + i + "\n").getBytes()));
                        }else if(i == 10){
                            sc.write(ByteBuffer.wrap("END".getBytes()));
                        }
                        written = true;
                        i++;
                    }
                }
            }
        } finally {
            sc.close();
            sel.close();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

由代码可以知道:1、创建SocketChannel通道,注册到选择器,刚兴趣的事件为OP_CONNECT、OP_READ、OP_WRITE 2、客户端sel.select()不会阻塞,对注册通道不断的遍历,并且每次都可写。原因是OP_WRITE事件会持续生效,即只要连接存在就可以写,不管服务端是否有返回

# 服务端实现

/**
 * @author:triumphxx
 * @Date:2021/11/20
 * @Time:10:31 上午
 * @微信公众号:北漂码农有话说
 * @网站:http://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: 多路复用IO模型服务端
 **/
public class MultiplexerServer {
    public static void main(String[] args) throws IOException {

        String encoding = System.getProperty("file.encoding");
        ByteBuffer buffer = ByteBuffer.allocate(16);
        SocketChannel ch = null;//Socket对应的channel
        //1.创建ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //2.创建选择器Selector
        Selector sel = Selector.open();

        try {
            //3.设置ServerSocketChannel通道为非阻塞
            ssc.configureBlocking(false);
            //4.ServerSocketChannel关联Socket,用于监听连接,使用本地ip和port
            //注意:Socket也对通道进行了改造,直接调Socket.getChannel()将返回bull,除非通过下边与通道关联
            //the expression (ssc.socket().getChannel() != null) is true
            ssc.socket().bind(new InetSocketAddress("localhost",9090));
            //5.将通道注册到Selector,感兴趣的事件为  连接  事件
            ssc.register(sel, SelectionKey.OP_ACCEPT);

            while(true) {
                //6.没有事件发生时,一直阻塞等待
                sel.select();
                //7.有事件发生时,获取Selector中所有SelectorKey(持有选择器与通道的关联关系)。
                //由于基于操作系统的poll()方法,当有事件发生时,只返回事件个数,无法确定具体通道,故只能对所有注册的通道进行遍历。
                Iterator<SelectionKey> it = sel.selectedKeys().iterator();
                //8.遍历所有SelectorKey,处理事件
                while(it.hasNext()) {
                    SelectionKey sKey = it.next();
                    it.remove();//防止重复处理
                    //9.判断SelectorKey对应的channel发生的事件是否socket连接
                    if(sKey.isAcceptable()) {
                        //10.与ServerSocket.accept()方法相似,接收到该通道套接字的连接,返回SocketChannel,与客户端进行交互
                        ch = ssc.accept();
                        System.out.println(
                                "Accepted connection from:" + ch.socket());
                        //11.设置该SocketChannel为非阻塞模式
                        ch.configureBlocking(false);
                        //12.将该通道注册到Selector中,感兴趣的事件为OP_READ(读)
                        ch.register(sel, SelectionKey.OP_READ);
                    }else {
                        //13.发生非连接事件,此处为OP_READ事件。SelectorKey获取注册的SocketChannel,用于读写
                        ch = (SocketChannel)sKey.channel();
                        //14.将数据从channel读到ByteBuffer中
                        ch.read(buffer);
                        CharBuffer cb = (CharBuffer) buffer.flip();
                        String response = cb.toString();
                        System.out.print("Echoing : " + response);
                        //15.再将获取到的数据会写给客户端
                        ch.write((ByteBuffer)buffer.rewind());
                        if(response.indexOf("END") != -1) {
                            ch.close();
                        }
                        buffer.clear();
                    }
                }
            }
        } finally {
            if(ch != null) {
                ch.close();
            }
            ssc.close();
            sel.close();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

由代码可以知道:1。创建ServerSocketChannel和Selector,设置通道非阻塞,并与服务端的Socket绑定, 2、注册 ServerSocketChannel到Selector,感兴趣的事件为OP_CONNECT(获取连接) 3、select()方法阻塞等待,直到有事件发生 4、遍历Selector中的所有注册事件,通过SelectorKey维护Selector和Channel关联关系 5、如果是连接事件,则调ServerSocketChannel.accept()方法获取SocketChannel,与客户端交互 6、如果是读事件,则通过SelectorKey中获取SocketChannel,读写数据

# 优点及缺点

优点:绕过了I/O在操作系统层面的accept()方法的阻塞问题

# 适用场景

IO多路复用是专门用来解决IO事件的,故而IO密集型程序适合使用IO多路复用

# IO多路复用方式

实现IO多路复用有三种实现方式:select、poll、epoll

# Select优缺点

  • 优点

    • select的可移植性好,在某些unix下不支持poll。
    • select对超时值提供了很好的精度,精确到微秒,而poll式毫秒。
  • 缺点

    • 单个进程可监视的fd数量被限制,默认是1024。
    • 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
    • 对fd进行扫描时是线性扫描,fd剧增后,IO效率降低,每次调用都对fd进行线性扫描遍历,随着fd的增加会造成遍历速度慢的问题。
    • select函数超时参数在返回时也是未定义的,考虑到可移植性,每次超时之后进入下一个select之前都要重新设置超时参数

# poll的优缺点

  • 优点

    • 不要求计算最大文件描述符的大小。
    • 应付大数量的文件描述符时比select要快。
    • 没有最大连接数的限制是基于链表存储的。
  • 缺点

    • 大量的fd数组被整体复制于内核态和用户态之间,而不管这样的复制是不是有意义。
    • 同select相同的是调用结束后需要轮询来获取就绪描述符。

# epoll的优点

  • 支持一个进程打开大数目的socket描述符
  • IO效率不随FD数目增加而线性下降
  • 使用mmap加速内核与用户空间的消息传递

mmap 可以理解为用户态和内核态的一个缓存层

# 小结

以上就是java的多路复用编程模型,以及多路复用实现的方式的简单介绍,很简单的一个小例子,希望对你有所帮助。代码传送门

我是思考集结处欢迎你的关注
看板娘