思考集结处

vuePress-theme-reco 思考集结处    2024
思考集结处 思考集结处

Choose mode

  • dark
  • auto
  • light
首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
author-avatar

思考集结处

43

文章

18

标签

首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
  • Java相关技术
  • 网络编程相关概念
  • 同步阻塞IO模型
  • 同步非阻塞IO模型
  • 多路复用IO模型
  • 异步IO模型
  • 多线程源码分析
  • 线程池
  • Spring 事件监听
  • Spring 重试机制
  • Spring 重试机制之listeners参数

异步IO模型

vuePress-theme-reco 思考集结处    2024

异步IO模型

思考集结处 2021-11-12 网络

有上上篇IO模型中的,同步非阻塞IO模型,我们能够知道,用户线程发送请求后,内核一马上返回响应结果 待内核将数据处理好后,发送通知给用户线程。

# 实现

使用java来实现异步IO模型,即我们所说的AIO的模型

# 客户端实现

/**
 * @author:triumphxx
 * @Date:2021/11/12
 * @Time:11:03 下午
 * @微信公众号:北漂码农有话说
 * @网站:http://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: AIO编程模型客户端
 **/
public class AIOClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        //打开一个客户端通道
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        //与服务端建立连接
        channel.connect(new InetSocketAddress("127.0.0.1",9090));
        try {
            //向服务端发送数据
            channel.write(ByteBuffer.wrap("Hello,我是客户端".getBytes())).get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        try {
            //从服务端读取返回的数据
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            //将通道中的数据写入缓冲Buffer
            channel.read(byteBuffer).get();
            byteBuffer.flip();
            String result = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
            //服务端返回的数据
            System.out.println("客户端收到服务端返回的内容:"+result);
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

由客户端的代码可以知道:首先我们将异步的客户端通道打开,然后建立与服务端的连接、给服务端发送信息、接收服务端返回的信息。

# 服务端实现

/**
 * @author:triumphxx
 * @Date:2021/11/12
 * @Time:11:03 下午
 * @微信公众号:北漂码农有话说
 * @网站:http://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: AIO编程模型服务端
 **/
public class AIOServer {

    public AsynchronousServerSocketChannel serverSocketChannel;

    public static void main(String[] args) throws Exception {
        //启动服务器,并监听客户端
        new AIOServer().listen();
        //因为是异步IO执行,让主线程睡眠但不关闭
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void listen() throws Exception {
        //打开一个服务端通道
        serverSocketChannel = AsynchronousServerSocketChannel.open();
        //监听9090端口
        serverSocketChannel.bind(new InetSocketAddress(9090));
        //监听
        serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
            @Override
            public void completed(AsynchronousSocketChannel client, AIOServer attachment) {
                try {
                    if (client.isOpen()) {
                        System.out.println("接收到新的客户端的连接,地址:" + client.getRemoteAddress());
                        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        //读取客户端发送的数据
                        client.read(byteBuffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
                            @Override
                            public void completed(Integer result, AsynchronousSocketChannel attachment) {
                                try {
                                    //读取请求,处理客户端发送的数据
                                    byteBuffer.flip();
                                    String content = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
                                    System.out.println("服务端接受到客户端发来的数据:" + content);
                                    //向客户端发送数据
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    writeBuffer.put("服务端发送的数据".getBytes());
                                    writeBuffer.flip();
                                    attachment.write(writeBuffer).get();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                                try {
                                    exc.printStackTrace();
                                    attachment.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //当有新的客户端接入的时候,直接调用accept的方法,递归执行下去,这样可以保证多个客户端都可以阻塞
                    attachment.serverSocketChannel.accept(attachment, this);
                }
            }

            @Override
            public void failed(Throwable exc, AIOServer attachment) {
                exc.printStackTrace();
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

由服务端的代码可以知道:首先我们打开一个服务端通道、监听端口、接收客户端的请求、处理请求、返回响应。

# 优点及缺点

优点:可靠性更好,吞吐量高、异步、编程模型简单

# 适用场景

适用于连接比较多,连接比较长的场景。视频流

# 小结

以上就是java的AIO编程模型,很简单的一个小例子,希望对你有所帮助。代码传送门

我是思考集结处欢迎你的关注
看板娘