`
agapple
  • 浏览: 1582998 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

I/O的知识储备

    博客分类:
  • java
阅读更多

背景

前一段时间一直在关注一些nio的相关技术,也和公司的架构师交流了一下,学到了一些相关原理,比如nio的优势和劣势。以及一些排查nio bug问题的方式,受益量多。为自己做一下技术储备,以后可以多玩玩nio的相关技术

知识点

unix网络编程第6章: 几种unix下的I/O模型。

 

  • 阻塞I/O模型    (java io)
  • 非阻塞I/O模型
  • I/O复用          (java 5的nio)
  • 信号驱动I/O
  • 异步I/O          (java7的aio)
几点说明: 
1.  阻塞和非阻塞的概念
   两者的区别侧重点在于,当前的线程是否会处于挂起,阻塞的状态。
2.  同步和异步的概念
   两者的区别侧重点在于,是当前业务的处理方式是否是一个串行的过程,异步的操作也可能是阻塞的动作。

几种模型的比较:
 

阻塞I/O模型: 

这个在java中平时使用比较多,不用多做介绍。 注意下stream &  reader的区别,自己面试别人也问的比较多。

I/O复用模型: 

介绍java nio之前,先了解一下unix中几种i/o复用的支持: select / poll 模型。 

linux早期有select / pselect : 

select 函数: 
#include<sys/select.h>

int select(int maxfdp1, fd_set *restrict readfds, fd_set *restrict writefds,fd_set *restrict exceptfds, struct timeval* restrict tvptr);
参数说明: 
1.  最大描述符大小,一般是最大值+1。
2.  中间三个参数readfds、writefds和exceptfds是指向描述符集的指针。这三个描述符集说明了我们关心的可读、可写或出于异常条件的各个描述符。就是一个位表,每次check一下当前描述符在对应位上的值是否为1
3.  第4个参数,就是对应的超时参数,精确到微妙,细节我也不关注。

返回值: 0超时,-1出错, 正数代表准备好的描述符

pselect函数: 
#include <sys/select.h>

int pselect(int maxfdp1, fd_set *restrict readfds, fd_set *restrict writefds,fd_set *restrict exceptfds, const struct timespec *restrict tsptr,const sigset_t *restrict sigmask);
 

 

它与select的区别在于:
  pselect使用timespec结构指定超时值。timespec结构以秒和纳秒表示时间,而非秒和微秒。
  pselect的超时值被声明为const,这保证了调用pselect不会改变timespec结构。
  pselect可使用一个可选择的信号屏蔽字。在调用pselect时,以原子操作的方式安装该信号屏蔽字,在返回时恢复以前的信号屏蔽字。 多了参数5

poll函数: 
#include <poll.h>

int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);
参数说明: 
1. pollfd是一个结构体,我们需要关注的描述符

 

struct pollfd {     
     int fd; /* file descriptor to check, or <0 to ignore */
     short events; /* events of interest on fd */
     short revents; /* events that occurred on fd */
};

2. nfds代表pollfd的长度

 

返回值:0超时,-1出错, 正数代表准备好的描述符

 

同样变种的有ppoll函数,具体可以见man ppoll,两者的区别和select/pselect区别一样,多了时间精度的支持+信号屏蔽字

 

和select系列的区别点,poll不再受限于select中位数组的长度限制,我们可以将关心的描述符添加到poolfd中。

 

再看epoll函数,是对select/poll的一个增强版:

 写道
它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用 率:
1. 因为它不会复用文件描述符集合来传递结果而迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合
2. 另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

epoll的除了提供select/poll 那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

电平触发(Level Triggered): select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息
边沿触发(Edge Triggered: 只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发。

 

几个接口: 

 

int epoll_create(int size);

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
1. 第一个参数是epoll_create()的返回值.
2. 第二个参数表示动作,用三个宏来表示:
    EPOLL_CTL_ADD:注册新的fd到epfd中;
    EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
    EPOLL_CTL_DEL:从epfd中删除一个fd;
3. 第三个参数是需要监听的fd
4. 第四个参数是告诉内核需要监听什么事
events可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1. 等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,
2. 参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有 说法说是永久阻塞)。
该函数返回需要处理的事件数目,如返回0表示已超时。

 

 

再看一下jdk中对nio的使用:

 

public static SelectorProvider create() {
	PrivilegedAction pa = new GetPropertyAction("os.name");
	String osname = (String) AccessController.doPrivileged(pa);
        if ("SunOS".equals(osname)) {
            return new sun.nio.ch.DevPollSelectorProvider();
        }

        // use EPollSelectorProvider for Linux kernels >= 2.6
        if ("Linux".equals(osname)) {
            pa = new GetPropertyAction("os.version");
            String osversion = (String) AccessController.doPrivileged(pa);
            String[] vers = osversion.split("\\.", 0);
            if (vers.length >= 2) {
                try {
                    int major = Integer.parseInt(vers[0]);
                    int minor = Integer.parseInt(vers[1]);
                    if (major > 2 || (major == 2 && minor >= 6)) {
                        return new sun.nio.ch.EPollSelectorProvider();
                    }
                } catch (NumberFormatException x) {
                    // format not recognized
                }
            }
        }

        return new sun.nio.ch.PollSelectorProvider();
    }

 

 

比较明显: 如果当前是sunos系统,直接使用DevPoll,在linux 2.6内核下,使用Epoll模型,否则使用Poll。

 

DevPoll估计是sunos自己整的一套poll模型,公司一般使用redhat系列,内核2.6.18,64位主机。所以就介绍下Epoll的实现

 

java实现类: EPollSelectorImpl

 

// wake up使用的两描述符
protected int fd0;
protected int fd1;

// The poll object , native的实现
EPollArrayWrapper pollWrapper;

// Maps from file descriptors to keys , 文件描述符和SelectKey的关系
private HashMap fdToKey;

 

 

看下select()的实现:

PollSelectorImpl类: ==============

protected int doSelect(long timeout) //具体执行epoll调用
        throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

继续看下EPollArrayWrappe :  ==========================

  int poll(long timeout) throws IOException {
        updateRegistrations();
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) { // 这里判断下当前响应的描述符是否为fd0,后面再细说
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }

继续看下EPollArrayWrapper 的native实现: epollWait():  ====================
JNIEXPORT jint JNICALL 
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout); 
    }
    
    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

static int iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout) 
{
    jlong start, now;
    int remaining = timeout;
    struct timeval t;
    int diff;

    gettimeofday(&t, NULL);
    start = t.tv_sec * 1000 + t.tv_usec / 1000;  //转化为ns单位

    for (;;) {
        int res = (*epoll_wait_func)(epfd, events, numfds, timeout);
        if (res < 0 && errno == EINTR) {  //处理异常
            if (remaining >= 0) {
                gettimeofday(&t, NULL);
                now = t.tv_sec * 1000 + t.tv_usec / 1000;
                diff = now - start;
                remaining -= diff;
                if (diff < 0 || remaining <= 0) {
                    return 0;
                }
                start = now;
            }
        } else {
            return res;
        }
    }
}

 

看下wakeup的实现 : 

 

EPollSelectorImpl类: 

    EPollSelectorImpl(SelectorProvider sp) {
	super(sp);
        int[] fdes = new int[2];
        IOUtil.initPipe(fdes, false);
        fd0 = fdes[0];
        fd1 = fdes[1];
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);    // 设置中断的两个描述符
        fdToKey = new HashMap();
    }
     public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {
                pollWrapper.interrupt();   //调用warpper进行中断
                interruptTriggered = true;
            }
        }
	return this;
    }

继续看下EPollArrayWrapper : 

    void initInterrupt(int fd0, int fd1) {
        outgoingInterruptFD = fd1;  //保存pipeline的描述符
        incomingInterruptFD = fd0;
        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);   //注册到epoll上。
    }
    public void interrupt() {
        interrupt(outgoingInterruptFD);  //调用native方法
    }
 
继续看下EPollArrayWrapper.c native实现: 

JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
    int fakebuf[1];
    fakebuf[0] = 1;
    if (write(fd, fakebuf, 1) < 0) {  //发送一字节的内容,让epoll_wait()能得到及时响应
        JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
    }
}

 实现方式也是挺简单的,弄了两个fd,一个往另一个写1byte的内容,促使epoll_wait能得到响应。

 

异步I/O模型: 

 

暂时还未真实用过,只是大致看过其api,有兴趣可以自己baidu。 

 

 

最后

 

后续会起一篇,单独针对nio在服务端和客户端使用上的注意点,主要是吸收了一些大牛门的经验,需要做个总结,消化一下。

  • 大小: 51.6 KB
  • 大小: 86.6 KB
  • 大小: 79.6 KB
  • 大小: 73.5 KB
  • 大小: 57.7 KB
  • 大小: 89.8 KB
分享到:
评论
2 楼 agapple 2011-08-12  
y13872888163 写道
大哥 你的后续介绍这么一直没看到啊


最近忙着做项目,都没太多时间写blog。
等项目走上正轨后,一定补上 
1 楼 y13872888163 2011-08-11  
大哥 你的后续介绍这么一直没看到啊

相关推荐

Global site tag (gtag.js) - Google Analytics