思考集结处

vuePress-theme-reco 思考集结处    2024
思考集结处 思考集结处

Choose mode

  • dark
  • auto
  • light
首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
author-avatar

思考集结处

43

文章

18

标签

首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
  • 源码分析
  • Tomcat的源码编译
  • Tomcat 架构分析
  • Tomcat 生命周期原理
  • Tomcat 启动流程

Tomcat 组件分析

vuePress-theme-reco 思考集结处    2024

Tomcat 组件分析

思考集结处 2021-10-31 Tomcat

Tomcat 组件分析

# Tomcat 组件分析

由前面的文章,我们可以知道 Tomcat 的的几大核心组件,包括Server 、Service 、Connector 、 Engine 、Host,所以我们想要 对Tomcat全面的进行了解,必须对这些组件进行深入的了解

# Server组件

public interface Server extends Lifecycle {}
1

# 实现类StandardServer

public final class StandardServer extends LifecycleMBeanBase implements Server {
    //构造器
    public StandardServer() {
        super();
        //JNDi 相关操作
        globalNamingResources = new NamingResourcesImpl();
        globalNamingResources.setContainer(this);

        if (isUseNaming()) {
            namingContextListener = new NamingContextListener();
            addLifecycleListener(namingContextListener);
        } else {
            namingContextListener = null;
        }

    }
    
    @Override
    public void addService(Service service) {

        service.setServer(this);

        synchronized (servicesLock) {
            //复制数组。然后放置新的service 
            Service results[] = new Service[services.length + 1];
            System.arraycopy(services, 0, results, 0, services.length);
            results[services.length] = service;
            services = results;

            if (getState().isAvailable()) {
                try {
                    // 启动service组件
                    service.start();
                } catch (LifecycleException e) {
                    // Ignore
                }
            }

            // 触发service属性变化事件
            support.firePropertyChange("service", null, service);
        }
    }

    @Override
    public void removeService(Service service) {

        synchronized (servicesLock) {
            int j = -1;
            for (int i = 0; i < services.length; i++) {
                if (service == services[i]) {
                    j = i;
                    break;
                }
            }
            if (j < 0) {
                return;
            }
            try {
                services[j].stop();
            } catch (LifecycleException e) {
                // Ignore
            }
            int k = 0;
            Service results[] = new Service[services.length - 1];
            for (int i = 0; i < services.length; i++) {
                if (i != j) {
                    results[k++] = services[i];
                }
            }
            services = results;

            // Report this property change to interested listeners
            support.firePropertyChange("service", service, null);
        }

    }
    
    @Override
    public void await() {
        // 内嵌的tomcat 不需要自己管理
        if (port == -2) {
            return;
        }
        //执行本地变量阻塞等待
        if (port==-1) {
            try {
                awaitThread = Thread.currentThread();
                while(!stopAwait) {
                    try {
                        Thread.sleep( 10000 );
                    } catch( InterruptedException ex ) {
                        // continue and check the flag
                    }
                }
            } finally {
                awaitThread = null;
            }
            return;
        }

        try {
            // 启动创建服务端套接字
            awaitSocket = new ServerSocket(port, 1,
                    InetAddress.getByName(address));
        } catch (IOException e) {
            log.error("StandardServer.await: create[" + address
                    + ":" + port
                    + "]: ", e);
            return;
        }

        try {
            awaitThread = Thread.currentThread();
            while (!stopAwait) {
                ServerSocket serverSocket = awaitSocket;
                if (serverSocket == null) {
                    break;
                }

                Socket socket = null;
                StringBuilder command = new StringBuilder();
                try {
                    InputStream stream;
                    long acceptStartTime = System.currentTimeMillis();
                    try {
                        //接收客户端请求
                        socket = serverSocket.accept();
                        //设置超时事件
                        socket.setSoTimeout(10 * 1000);  // Ten seconds
                        //获取客户端发送的请求
                        stream = socket.getInputStream();
                    } catch (SocketTimeoutException ste) {
                        // This should never happen but bug 56684 suggests that
                        // it does.
                        log.warn(sm.getString("standardServer.accept.timeout",
                                Long.valueOf(System.currentTimeMillis() - acceptStartTime)), ste);
                        continue;
                    } catch (AccessControlException ace) {
                        log.warn(sm.getString("standardServer.accept.security"), ace);
                        continue;
                    } catch (IOException e) {
                        if (stopAwait) {
                            // Wait was aborted with socket.close()
                            break;
                        }
                        log.error(sm.getString("standardServer.accept.error"), e);
                        break;
                    }

                    // Read a set of characters from the socket
                    int expected = 1024; // Cut off to avoid DoS attack
                    while (expected < shutdown.length()) {
                        if (random == null) {
                            random = new Random();
                        }
                        expected += (random.nextInt() % 1024);
                    }
                    while (expected > 0) {
                        int ch = -1;
                        try {
                            ch = stream.read();
                        } catch (IOException e) {
                            log.warn(sm.getString("standardServer.accept.readError"), e);
                            ch = -1;
                        }
                        // Control character or EOF (-1) terminates loop
                        if (ch < 32 || ch == 127) {
                            break;
                        }
                        command.append((char) ch);
                        expected--;
                    }
                } finally {
                    // Close the socket now that we are done with it
                    try {
                        if (socket != null) {
                            socket.close();
                        }
                    } catch (IOException e) {
                        // Ignore
                    }
                }

                // 匹配 server.xml的shutdown
                boolean match = command.toString().equals(shutdown);
                if (match) {
                    log.info(sm.getString("standardServer.shutdownViaPort"));
                    break;
                } else {
                    log.warn(sm.getString("standardServer.invalidShutdownCommand", command.toString()));
                }
            }
        } finally {
            ServerSocket serverSocket = awaitSocket;
            awaitThread = null;
            awaitSocket = null;

            // Close the server socket and return
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    // Ignore
                }
            }
        }
    }

    // 重点方法
    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        // Register global String cache
        // Note although the cache is global, if there are multiple Servers
        // present in the JVM (may happen when embedding) then the same cache
        // will be registered under multiple names
        onameStringCache = register(new StringCache(), "type=StringCache");

        // Register the MBeanFactory
        MBeanFactory factory = new MBeanFactory();
        factory.setContainer(this);
        onameMBeanFactory = register(factory, "type=MBeanFactory");

        // Register the naming resources
        globalNamingResources.init();

        // Populate the extension validator with JARs from common and shared
        // class loaders
        if (getCatalina() != null) {
            ClassLoader cl = getCatalina().getParentClassLoader();
            // Walk the class loader hierarchy. Stop at the system class loader.
            // This will add the shared (if present) and common class loaders
            while (cl != null && cl != ClassLoader.getSystemClassLoader()) {
                if (cl instanceof URLClassLoader) {
                    URL[] urls = ((URLClassLoader) cl).getURLs();
                    for (URL url : urls) {
                        if (url.getProtocol().equals("file")) {
                            try {
                                File f = new File (url.toURI());
                                if (f.isFile() &&
                                        f.getName().endsWith(".jar")) {
                                    ExtensionValidator.addSystemResource(f);
                                }
                            } catch (URISyntaxException | IOException e) {
                                // Ignore
                            }
                        }
                    }
                }
                cl = cl.getParent();
            }
        }
        // Initialize our defined Services
        // 初始化 service 组件
        for (Service service : services) {
            service.init();
        }
    }    
    // 重点方法
    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();
        //注册到JMX
        onameStringCache = register(new StringCache(), "type=StringCache")
        MBeanFactory factory = new MBeanFactory();
        factory.setContainer(this);
        onameMBeanFactory = register(factory, "type=MBeanFactory");
        globalNamingResources.init();

        //
        if (getCatalina() != null) {
            //Common 类加载器
            ClassLoader cl = getCatalina().getParentClassLoader();
            //只有URLClassLoader类加载可以设定资源路径
            while (cl != null && cl != ClassLoader.getSystemClassLoader()) {
                if (cl instanceof URLClassLoader) {
                    URL[] urls = ((URLClassLoader) cl).getURLs();
                    for (URL url : urls) {
                        if (url.getProtocol().equals("file")) {
                            try {
                                File f = new File (url.toURI());
                                if (f.isFile() &&
                                        f.getName().endsWith(".jar")) {
                                    //添加校验器进行校验
                                    ExtensionValidator.addSystemResource(f);
                                }
                            } catch (URISyntaxException | IOException e) {
                                // Ignore
                            }
                        }
                    }
                }
                cl = cl.getParent();
            }
        }
        // 初始化 service 组件
        for (Service service : services) {
            service.init();
        }
    }
    //启动方法
    @Override
    protected void startInternal() throws LifecycleException {
        // 配置启动事件
        fireLifecycleEvent(CONFIGURE_START_EVENT, null);
        //设置为启动中
        setState(LifecycleState.STARTING);
        globalNamingResources.start();

        // 启动service组件
        synchronized (servicesLock) {
            for (Service service : services) {
                service.start();
            }
        }
    }

    // 停止方法
    @Override
    protected void stopInternal() throws LifecycleException {
        //设置为启动中
        setState(LifecycleState.STOPPING);
        //设置停止事件
        fireLifecycleEvent(CONFIGURE_STOP_EVENT, null);

        // 停止Service
        for (Service service : services) {
            service.stop();
        }
        globalNamingResources.stop();
        stopAwait();
    }



    public void stopAwait() {
        stopAwait=true;
        Thread t = awaitThread;
        if (t != null) {
            //套接字关闭
            ServerSocket s = awaitSocket;
            if (s != null) {
                awaitSocket = null;
                try {
                    s.close();
                } catch (IOException e) {
                    // Ignored
                }
            }
            t.interrupt();
            try {
                //唤醒
                t.join(1000);
            } catch (InterruptedException e) {
                // Ignored
            }
        }
    }
    @Override
    protected void destroyInternal() throws LifecycleException {
        // 销毁Service
        for (Service service : services) {
            service.destroy();
        }

        globalNamingResources.destroy();
        //减除JMX的注册
        unregister(onameMBeanFactory);
        unregister(onameStringCache);

        super.destroyInternal();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376

# Service组件

/**
 * A <strong>Service</strong> is a group of one or more
 * <strong>Connectors</strong> that share a single <strong>Container(引擎)</strong>
 * to process their incoming requests.  This arrangement allows, for example,
 * a non-SSL and SSL connector to share the same population of web apps.
 * <p>
 */
public interface Service extends Lifecycle {}
1
2
3
4
5
6
7
8

# 类StandardService

public class StandardService extends LifecycleMBeanBase implements Service {
    
    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        if (engine != null) {
            engine.init();
        }

        // Initialize any Executors
        for (Executor executor : findExecutors()) {
            if (executor instanceof JmxEnabled) {
                ((JmxEnabled) executor).setDomain(getDomain());
            }
            executor.init();
        }

        // Initialize mapper listener
        mapperListener.init();

        // Initialize our defined Connectors
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
                    connector.init();
                } catch (Exception e) {
                    String message = sm.getString(
                            "standardService.connector.initFailed", connector);
                    log.error(message, e);

                    if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
                        throw new LifecycleException(message);
                    }
                }
            }
        }
    }
    @Override
    protected void startInternal() throws LifecycleException {

        if(log.isInfoEnabled()) {
            log.info(sm.getString("standardService.start.name", this.name));
        }
        setState(LifecycleState.STARTING);

        // Start our defined Container first
        if (engine != null) {
            synchronized (engine) {
                engine.start();
            }
        }

        synchronized (executors) {
            for (Executor executor: executors) {
                executor.start();
            }
        }

        mapperListener.start();

        // Start our defined Connectors second
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                try {
                    // If it has already failed, don't try and start it
                    if (connector.getState() != LifecycleState.FAILED) {
                        connector.start();
                    }
                } catch (Exception e) {
                    log.error(sm.getString(
                            "standardService.connector.startFailed",
                            connector), e);
                }
            }
        }
    }

    @Override
    protected void stopInternal() throws LifecycleException {

        // Pause connectors first
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                try {
                    connector.pause();
                } catch (Exception e) {
                    log.error(sm.getString(
                            "standardService.connector.pauseFailed",
                            connector), e);
                }
                // Close server socket if bound on start
                // Note: test is in AbstractEndpoint
                connector.getProtocolHandler().closeServerSocketGraceful();
            }
        }

        if(log.isInfoEnabled()) {
            log.info(sm.getString("standardService.stop.name", this.name));
        }
        setState(LifecycleState.STOPPING);

        // Stop our defined Container second
        if (engine != null) {
            synchronized (engine) {
                engine.stop();
            }
        }

        // Now stop the connectors
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                if (!LifecycleState.STARTED.equals(
                        connector.getState())) {
                    // Connectors only need stopping if they are currently
                    // started. They may have failed to start or may have been
                    // stopped (e.g. via a JMX call)
                    continue;
                }
                try {
                    connector.stop();
                } catch (Exception e) {
                    log.error(sm.getString(
                            "standardService.connector.stopFailed",
                            connector), e);
                }
            }
        }

        // If the Server failed to start, the mapperListener won't have been
        // started
        if (mapperListener.getState() != LifecycleState.INITIALIZED) {
            mapperListener.stop();
        }
        synchronized (executors) {
            for (Executor executor: executors) {
                executor.stop();
            }
        }
    }

    @Override
    protected void destroyInternal() throws LifecycleException {
        mapperListener.destroy();

        // Destroy our defined Connectors
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
                    connector.destroy();
                } catch (Exception e) {
                    log.error(sm.getString(
                            "standardService.connector.destroyFailed", connector), e);
                }
            }
        }

        // Destroy any Executors
        for (Executor executor : findExecutors()) {
            executor.destroy();
        }

        if (engine != null) {
            engine.destroy();
        }

        super.destroyInternal();
    }

  
    private String name = null;
    

    private Server server = null;
    protected final PropertyChangeSupport support = new PropertyChangeSupport(this);
    protected Connector connectors[] = new Connector[0];
    private final Object connectorsLock = new Object();
    protected final ArrayList<Executor> executors = new ArrayList<>();

    private Engine engine = null;

    private ClassLoader parentClassLoader = null;
    /**
     * Mapper.
     */
    protected final Mapper mapper = new Mapper();
    /**
     * Mapper listener.
     */
    protected final MapperListener mapperListener = new MapperListener(this);
    
    @Override
    public Mapper getMapper() {
        return mapper;
    }
    
    @Override
    public Engine getContainer() {
        return engine;
    }

    @Override
    public void setContainer(Engine engine) {
        Engine oldEngine = this.engine;
        if (oldEngine != null) {
            oldEngine.setService(null);
        }
        this.engine = engine;
        if (this.engine != null) {
            this.engine.setService(this);
        }
        if (getState().isAvailable()) {
            if (this.engine != null) {
                try {
                    this.engine.start();
                } catch (LifecycleException e) {
                    log.error(sm.getString("standardService.engine.startFailed"), e);
                }
            }
            // Restart MapperListener to pick up new engine.
            try {
                mapperListener.stop();
            } catch (LifecycleException e) {
                log.error(sm.getString("standardService.mapperListener.stopFailed"), e);
            }
            try {
                mapperListener.start();
            } catch (LifecycleException e) {
                log.error(sm.getString("standardService.mapperListener.startFailed"), e);
            }
            if (oldEngine != null) {
                try {
                    oldEngine.stop();
                } catch (LifecycleException e) {
                    log.error(sm.getString("standardService.engine.stopFailed"), e);
                }
            }
        }

        support.firePropertyChange("container", oldEngine, this.engine);
    }

    @Override
    public String getName() {
        return name;
    }
    
    @Override
    public void setName(String name) {
        this.name = name;
    }

    @Override
    public Server getServer() {
        return this.server;
    }

    @Override
    public void setServer(Server server) {
        this.server = server;
    }
    
    @Override
    public void addConnector(Connector connector) {

        synchronized (connectorsLock) {
            connector.setService(this);
            Connector results[] = new Connector[connectors.length + 1];
            System.arraycopy(connectors, 0, results, 0, connectors.length);
            results[connectors.length] = connector;
            connectors = results;

            if (getState().isAvailable()) {
                try {
                    connector.start();
                } catch (LifecycleException e) {
                    log.error(sm.getString(
                            "standardService.connector.startFailed",
                            connector), e);
                }
            }
            support.firePropertyChange("connector", null, connector);
        }

    }

    public ObjectName[] getConnectorNames() {
        ObjectName results[] = new ObjectName[connectors.length];
        for (int i=0; i<results.length; i++) {
            results[i] = connectors[i].getObjectName();
        }
        return results;
    }
    
    public void addPropertyChangeListener(PropertyChangeListener listener) {
        support.addPropertyChangeListener(listener);
    }
    
    @Override
    public Connector[] findConnectors() {
        return connectors;
    }
    
    @Override
    public void removeConnector(Connector connector) {

        synchronized (connectorsLock) {
            int j = -1;
            for (int i = 0; i < connectors.length; i++) {
                if (connector == connectors[i]) {
                    j = i;
                    break;
                }
            }
            if (j < 0) {
                return;
            }
            if (connectors[j].getState().isAvailable()) {
                try {
                    connectors[j].stop();
                } catch (LifecycleException e) {
                    log.error(sm.getString(
                            "standardService.connector.stopFailed",
                            connectors[j]), e);
                }
            }
            connector.setService(null);
            int k = 0;
            Connector results[] = new Connector[connectors.length - 1];
            for (int i = 0; i < connectors.length; i++) {
                if (i != j) {
                    results[k++] = connectors[i];
                }
            }
            connectors = results;

            // Report this property change to interested listeners
            support.firePropertyChange("connector", connector, null);
        }
    }
    
    public void removePropertyChangeListener(PropertyChangeListener listener) {
        support.removePropertyChangeListener(listener);
    }

    @Override
    public ClassLoader getParentClassLoader() {
        if (parentClassLoader != null) {
            return parentClassLoader;
        }
        if (server != null) {
            return server.getParentClassLoader();
        }
        return ClassLoader.getSystemClassLoader();
    }


    @Override
    public void setParentClassLoader(ClassLoader parent) {
        ClassLoader oldParentClassLoader = this.parentClassLoader;
        this.parentClassLoader = parent;
        support.firePropertyChange("parentClassLoader", oldParentClassLoader,
                                   this.parentClassLoader);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367

# Connector组件

Tomcat中支持的协议有三种,分别是HTTP、 AJP、APR,这三种协议操作的区别。特别说明:Tomcat默认的是NIO的模式,如下图:

# 协议处理器 ProtocolHandler

public interface ProtocolHandler {
    // 适配 servlet的适配器
    public Adapter getAdapter();
    
    public void setAdapter(Adapter adapter);
    
    // 执行器(线程池)并行处理连接
    public Executor getExecutor();
    
    public void init() throws Exception;
    
    public void start() throws Exception;
    
    public void pause() throws Exception;

    public void resume() throws Exception;
    
    public void stop() throws Exception;
    
    public void destroy() throws Exception;
    
    public void closeServerSocketGraceful();
    
    public boolean isAprRequired();

    public boolean isSendfileSupported();
    
    public void addSslHostConfig(SSLHostConfig sslHostConfig);
    public SSLHostConfig[] findSslHostConfigs();

    //协议升级
    public void addUpgradeProtocol(UpgradeProtocol upgradeProtocol);
    public UpgradeProtocol[] findUpgradeProtocols();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# AbstractProtocol协议处理模板

该类是一个门面,其实内部是Endpoint在服务。

public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {
    
    //构建Endpoint
    public AbstractProtocol(AbstractEndpoint<S> endpoint) {
        this.endpoint = endpoint;
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }
    //协议处理器
    protected abstract Processor createProcessor();
    protected abstract Processor createUpgradeProcessor(
            SocketWrapperBase<?> socket,
            UpgradeToken upgradeToken);
    
    // 初始化端点
    @Override
    public void init() throws Exception {
    
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        endpoint.setDomain(domain);

        endpoint.init();
    }
    // 启动端点
    @Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
        }
        endpoint.start();
        // Servlet 的异步处理,检测异步超时的线程
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# Endpoint原理

# AbstractEndpoint 端点管理器

@Override
public void startInternal() throws Exception {
    if (!running) {
        running = true;
        paused = false;
				//缓存处理器
        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        //缓存事件
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        //缓存通道
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        // Create worker collection
      	//创建线程池 默认200个
        if (getExecutor() == null) {
            createExecutor();
        }

        initializeConnectionLatch();

        // 创建最小两个Poller
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }

        startAcceptorThreads();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# NioEndpoint 原理

两大核心:Acceptor-请求接收器,Poller-处理socket的读写,并且将任务丢给worker来完成

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        int errorDelay = 0;
        while (running) {
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;
            try {
                countUpOrAwaitConnection();
                SocketChannel socket = null;
                try {
                    //阻塞 拿到请求
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    countDownConnection();
                    if (running) {
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        throw ioe;
                    } else {
                        break;
                    }
                }
                errorDelay = 0;
                if (running && !paused) {
										//处理请求
                    if (!setSocketOptions(socket)) {
                        closeSocket(socket);
                    }
                } else {
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
    state = AcceptorState.ENDED;
}
 
  
public class Poller implements Runnable {

    private Selector selector;
    private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();

    private volatile boolean close = false;
    private long nextExpiration = 0;//optimize expiration handling

    private AtomicLong wakeupCounter = new AtomicLong(0);

    private volatile int keyCount = 0;

    public Poller() throws IOException {
        this.selector = Selector.open();
    }

    public int getKeyCount() { return keyCount; }

    public Selector getSelector() { return selector;}

    /**
     * Destroy the poller.
     */
    protected void destroy() {
        // Wait for polltime before doing anything, so that the poller threads
        // exit, otherwise parallel closure of sockets which are still
        // in the poller can cause problems
        close = true;
        selector.wakeup();
    }

    private void addEvent(PollerEvent event) {
        events.offer(event);
        if (wakeupCounter.incrementAndGet() == 0) {
            selector.wakeup();
        }
    }

    /**
     * Add specified socket and associated pool to the poller. The socket will
     * be added to a temporary array, and polled first after a maximum amount
     * of time equal to pollTime (in most cases, latency will be much lower,
     * however).
     *
     * @param socket to add to the poller
     * @param interestOps Operations for which to register this socket with
     *                    the Poller
     */
    public void add(final NioChannel socket, final int interestOps) {
        PollerEvent r = eventCache.pop();
        if (r == null) {
            r = new PollerEvent(socket,null,interestOps);
        } else {
            r.reset(socket,null,interestOps);
        }
        addEvent(r);
        if (close) {
            NioEndpoint.NioSocketWrapper ka = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
            processSocket(ka, SocketEvent.STOP, false);
        }
    }

    /**
     * Processes events in the event queue of the Poller.
     *
     * @return <code>true</code> if some events were processed,
     *   <code>false</code> if queue was empty
     */
    public boolean events() {
        boolean result = false;

        PollerEvent pe = null;
        for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
            result = true;
            try {
                pe.run();
                pe.reset();
                if (running && !paused) {
                    eventCache.push(pe);
                }
            } catch ( Throwable x ) {
                log.error("",x);
            }
        }

        return result;
    }

    /**
     * Registers a newly created socket with the poller.
     *
     * @param socket    The newly created socket
     */
    public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getSocketProperties().getSoTimeout());
        ka.setWriteTimeout(getSocketProperties().getSoTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) {
            r = new PollerEvent(socket,ka,OP_REGISTER);
        } else {
            r.reset(socket,ka,OP_REGISTER);
        }
        addEvent(r);
    }

    public NioSocketWrapper cancelledKey(SelectionKey key) {
        NioSocketWrapper ka = null;
        try {
            if ( key == null )
             {
                return null;//nothing to do
            }
            ka = (NioSocketWrapper) key.attach(null);
            if (ka != null) {
                // If attachment is non-null then there may be a current
                // connection with an associated processor.
                getHandler().release(ka);
            }
            if (key.isValid()) {
                key.cancel();
            }
            // If it is available, close the NioChannel first which should
            // in turn close the underlying SocketChannel. The NioChannel
            // needs to be closed first, if available, to ensure that TLS
            // connections are shut down cleanly.
            if (ka != null) {
                try {
                    ka.getSocket().close(true);
                } catch (Exception e){
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString(
                                "endpoint.debug.socketCloseFail"), e);
                    }
                }
            }
            // The SocketChannel is also available via the SelectionKey. If
            // it hasn't been closed in the block above, close it now.
            if (key.channel().isOpen()) {
                try {
                    key.channel().close();
                } catch (Exception e) {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString(
                                "endpoint.debug.channelCloseFail"), e);
                    }
                }
            }
            try {
                if (ka != null && ka.getSendfileData() != null
                        && ka.getSendfileData().fchannel != null
                        && ka.getSendfileData().fchannel.isOpen()) {
                    ka.getSendfileData().fchannel.close();
                }
            } catch (Exception ignore) {
            }
            if (ka != null) {
                countDownConnection();
                ka.closed = true;
            }
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            if (log.isDebugEnabled()) {
                log.error("",e);
            }
        }
        return ka;
    }

    /**
     * The background thread that adds sockets to the Poller, checks the
     * poller for triggered events and hands the associated socket off to an
     * appropriate processor as events occur.
     */
    @Override
    public void run() {
        // Loop until destroy() is called
        while (true) {

            boolean hasEvents = false;

            try {
                if (!close) {
                    hasEvents = events();
                    if (wakeupCounter.getAndSet(-1) > 0) {
                        // If we are here, means we have other stuff to do
                        // Do a non blocking select
                        keyCount = selector.selectNow();
                    } else {
                        keyCount = selector.select(selectorTimeout);
                    }
                    wakeupCounter.set(0);
                }
                if (close) {
                    events();
                    timeout(0, false);
                    try {
                        selector.close();
                    } catch (IOException ioe) {
                        log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                    }
                    break;
                }
            } catch (Throwable x) {
                ExceptionUtils.handleThrowable(x);
                log.error("",x);
                continue;
            }
            // Either we timed out or we woke up, process events first
            if (keyCount == 0) {
                hasEvents = (hasEvents | events());
            }

            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // Walk through the collection of ready keys and dispatch
            // any active event.
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                iterator.remove();
                NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (socketWrapper != null) {
                    processKey(sk, socketWrapper);
                }
            }

            // Process timeouts
            timeout(keyCount,hasEvents);
        }

        getStopLatch().countDown();
    }

    protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
        try {
            if (close) {
                cancelledKey(sk);
            } else if ( sk.isValid() && attachment != null ) {
                if (sk.isReadable() || sk.isWritable() ) {
                    if ( attachment.getSendfileData() != null ) {
                        processSendfile(sk,attachment, false);
                    } else {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // Read goes before write
                        if (sk.isReadable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        if (!closeSocket && sk.isWritable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                        if (closeSocket) {
                            cancelledKey(sk);
                        }
                    }
                }
            } else {
                // Invalid key
                cancelledKey(sk);
            }
        } catch (CancelledKeyException ckx) {
            cancelledKey(sk);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            log.error("",t);
        }
    }

    public SendfileState processSendfile(SelectionKey sk, NioSocketWrapper socketWrapper,
            boolean calledByProcessor) {
        NioChannel sc = null;
        try {
            unreg(sk, socketWrapper, sk.readyOps());
            SendfileData sd = socketWrapper.getSendfileData();

            if (log.isTraceEnabled()) {
                log.trace("Processing send file for: " + sd.fileName);
            }

            if (sd.fchannel == null) {
                // Setup the file channel
                File f = new File(sd.fileName);
                @SuppressWarnings("resource") // Closed when channel is closed
                FileInputStream fis = new FileInputStream(f);
                sd.fchannel = fis.getChannel();
            }

            // Configure output channel
            sc = socketWrapper.getSocket();
            // TLS/SSL channel is slightly different
            WritableByteChannel wc = ((sc instanceof SecureNioChannel) ? sc : sc.getIOChannel());

            // We still have data in the buffer
            if (sc.getOutboundRemaining() > 0) {
                if (sc.flushOutbound()) {
                    socketWrapper.updateLastWrite();
                }
            } else {
                long written = sd.fchannel.transferTo(sd.pos,sd.length,wc);
                if (written > 0) {
                    sd.pos += written;
                    sd.length -= written;
                    socketWrapper.updateLastWrite();
                } else {
                    // Unusual not to be able to transfer any bytes
                    // Check the length was set correctly
                    if (sd.fchannel.size() <= sd.pos) {
                        throw new IOException("Sendfile configured to " +
                                "send more data than was available");
                    }
                }
            }
            if (sd.length <= 0 && sc.getOutboundRemaining()<=0) {
                if (log.isDebugEnabled()) {
                    log.debug("Send file complete for: " + sd.fileName);
                }
                socketWrapper.setSendfileData(null);
                try {
                    sd.fchannel.close();
                } catch (Exception ignore) {
                }
                // For calls from outside the Poller, the caller is
                // responsible for registering the socket for the
                // appropriate event(s) if sendfile completes.
                if (!calledByProcessor) {
                    switch (sd.keepAliveState) {
                    case NONE: {
                        if (log.isDebugEnabled()) {
                            log.debug("Send file connection is being closed");
                        }
                        close(sc, sk);
                        break;
                    }
                    case PIPELINED: {
                        if (log.isDebugEnabled()) {
                            log.debug("Connection is keep alive, processing pipe-lined data");
                        }
                        if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                            close(sc, sk);
                        }
                        break;
                    }
                    case OPEN: {
                        if (log.isDebugEnabled()) {
                            log.debug("Connection is keep alive, registering back for OP_READ");
                        }
                        reg(sk, socketWrapper, SelectionKey.OP_READ);
                        break;
                    }
                    }
                }
                return SendfileState.DONE;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("OP_WRITE for sendfile: " + sd.fileName);
                }
                if (calledByProcessor) {
                    add(socketWrapper.getSocket(),SelectionKey.OP_WRITE);
                } else {
                    reg(sk,socketWrapper,SelectionKey.OP_WRITE);
                }
                return SendfileState.PENDING;
            }
        } catch (IOException x) {
            if (log.isDebugEnabled()) {
                log.debug("Unable to complete sendfile request:", x);
            }
            if (!calledByProcessor && sc != null) {
                close(sc, sk);
            }
            return SendfileState.ERROR;
        } catch (Throwable t) {
            log.error("", t);
            if (!calledByProcessor && sc != null) {
                close(sc, sk);
            }
            return SendfileState.ERROR;
        }
    }

    protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
        // This is a must, so that we don't have multiple threads messing with the socket
        reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
    }

    protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
        sk.interestOps(intops);
        socketWrapper.interestOps(intops);
    }

    protected void timeout(int keyCount, boolean hasEvents) {
        long now = System.currentTimeMillis();
        // This method is called on every loop of the Poller. Don't process
        // timeouts on every loop of the Poller since that would create too
        // much load and timeouts can afford to wait a few seconds.
        // However, do process timeouts if any of the following are true:
        // - the selector simply timed out (suggests there isn't much load)
        // - the nextExpiration time has passed
        // - the server socket is being closed
        if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
            return;
        }
        int keycount = 0;
        try {
            for (SelectionKey key : selector.keys()) {
                keycount++;
                try {
                    NioSocketWrapper ka = (NioSocketWrapper) key.attachment();
                    if ( ka == null ) {
                        cancelledKey(key); //we don't support any keys without attachments
                    } else if (close) {
                        key.interestOps(0);
                        ka.interestOps(0); //avoid duplicate stop calls
                        processKey(key,ka);
                    } else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
                              (ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                        boolean isTimedOut = false;
                        // Check for read timeout
                        if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                            long delta = now - ka.getLastRead();
                            long timeout = ka.getReadTimeout();
                            isTimedOut = timeout > 0 && delta > timeout;
                        }
                        // Check for write timeout
                        if (!isTimedOut && (ka.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                            long delta = now - ka.getLastWrite();
                            long timeout = ka.getWriteTimeout();
                            isTimedOut = timeout > 0 && delta > timeout;
                        }
                        if (isTimedOut) {
                            key.interestOps(0);
                            ka.interestOps(0); //avoid duplicate timeout calls
                            ka.setError(new SocketTimeoutException());
                            if (!processSocket(ka, SocketEvent.ERROR, true)) {
                                cancelledKey(key);
                            }
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    cancelledKey(key);
                }
            }
        } catch (ConcurrentModificationException cme) {
            // See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943
            log.warn(sm.getString("endpoint.nio.timeoutCme"), cme);
        }
        // For logging purposes only
        long prevExp = nextExpiration;
        nextExpiration = System.currentTimeMillis() +
                socketProperties.getTimeoutInterval();
        if (log.isTraceEnabled()) {
            log.trace("timeout completed: keys processed=" + keycount +
                    "; now=" + now + "; nextExpiration=" + prevExp +
                    "; keyCount=" + keyCount + "; hasEvents=" + hasEvents +
                    "; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) ));
        }

    }
}  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523

# LimitLatch 原理

我是思考集结处欢迎你的关注
看板娘