皮肤病医院痤疮科:MINA源码分析的札记1

来源:百度文库 编辑:偶看新闻 时间:2024/04/29 15:50:06

MINA源码分析的札记1--Write流程

时间:2010-11-13 14:44来源:互联网 作者:互联网 点击:145次

从IoSession调用write的过程:

IoSession.write(object message)

真正实现这个方法的是AbstractIoSession

1、创建writeFuture对象,用于异步操作的返回

2、将传入的Object对象,包装成WriteRequest对象,交给IoFilterChain去处理。

3、核心的实现就是这些代码:

// Now, we can write the message. First, create a futureWriteFuture writeFuture = new DefaultWriteFuture(this);WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);// Then, get the chain and inject the WriteRequest into itIoFilterChain filterChain = getFilterChain();filterChain.fireFilterWrite(writeRequest);//.....return writeFuture;

下面看IoFilterChain是怎么处理的?

首先,IoFilterChain是用一个双向列表来保存它所包含的所有filter的,大概的结构如下:

head <-> filter1 <-> filter2 <-> ..... <-> filterN <-> tail

其中head和tail是两个固有、内置、特殊的filter,主要是衔接和过渡的功能。如tail就是负责调用IoHandler的功能。

回到IoFilterChain怎么处理write的话题。

4、IoFilterChain从列表中找到tail,从tail开始查找filter,顺序调用每个filter的filterWrite()方法

典型实现:

    public void fireFilterWrite(WriteRequest writeRequest) {        Entry tail = this.tail;        callPreviousFilterWrite(tail, session, writeRequest);    }    private void callPreviousFilterWrite(Entry entry, IoSession session,            WriteRequest writeRequest) {        try {            IoFilter filter = entry.getFilter();            NextFilter nextFilter = entry.getNextFilter();            filter.filterWrite(nextFilter, session, writeRequest);        } catch (Throwable e) {            writeRequest.getFuture().setException(e);            fireExceptionCaught(e);        }    }

5、最后必然就掉到head这个filter的filterWrite,它的实现应该有些特殊,才能把消息发送给IoProcessor

将消息放入发送队列中,然后调用IoProcessor flush出去。

 public void filterWrite(NextFilter nextFilter, IoSession session,                WriteRequest writeRequest) throws Exception {            AbstractIoSession s = (AbstractIoSession) session;            s.getWriteRequestQueue().offer(s, writeRequest);            if (!s.isWriteSuspended()) {                s.getProcessor().flush(s);            }        }

WriteRequestQueue的默认实现就是java.util.concurrent.ConcurrentLinkedQueue,舍去传入的session对象。?

在看processor怎么flush出去之前,先备忘下面的东西。

NOTE1:在这个filter传递过程中,IoSession一直被传递下去:

filterWrite(NextFilter nextFilter, IoSession session,                WriteRequest writeRequest)

NOTE2:其实head这个filter就只实现了两个方法,也就是它只关心这两个操作:

   private class HeadFilter extends IoFilterAdapter {        @SuppressWarnings("unchecked")        @Override        public void filterWrite(NextFilter nextFilter, IoSession session,                WriteRequest writeRequest) throws Exception {        }        @SuppressWarnings("unchecked")        @Override        public void filterClose(NextFilter nextFilter, IoSession session)                throws Exception {        }    }

6、IoProcessor是怎么处理的?

IoProcessor的flush方法在AbstractPollingIoProcessor类中实现。

它将传入的session对象加入到flushingSessions列表,

然后调用processor内在的selector.wakeUp (java.nio.channels.Selector),

而在AbstractPollingIoProcessor类中独立运行的Processor线程,就回从select()等待中被唤醒。

7、AbstractPollingIoProcessor.Processor线程

被唤醒之后,从flushingSessions中取出session对象,再从session中取出WriteRequest对象

如果WriteRequest对象是IoBuffer类型的,则:

if (buf.remaining() <= length) {            return session.getChannel().write(buf.buf());        }

如果WriteRequest对象是File对象,则:

region.getFileChannel().transferTo(                    region.getPosition(), length, session.getChannel());

region是FileRegion对象。就是将文件内容传输到session的Channel上去。

【这部分省略了很多细节,具体了解细节的话,主要看AbstractPollingIoProcessor类和NioProcessor类】

8、自此,才将数据真正发送到网络上去。