« | July 2025 | » | 日 | 一 | 二 | 三 | 四 | 五 | 六 | | | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | | | |
| 公告 |
|
Blog信息 |
blog名称:FoxWolf 日志总数:127 评论数量:246 留言数量:0 访问次数:849993 建立时间:2006年5月31日 |

| |
[Linux学习]IO复用,linux poll 文章收藏, 电脑与网络
FoxWolf 发表于 2008/3/12 11:20:30 |
Part1:
五个I/O模型 1.阻塞I/O 2.非阻塞I/O 3.I/O复用(select和poll) 4.信号驱动I/O(SIGIO) 5.异步I/O 阻塞 I/O模型 进程调用recvfrom,此系统调用直到数据报到达且拷贝到应用缓冲区或是出错才返回。最常见的错误是系统调用被信号中断,进程阻塞的整段时间是指从调用recvfrom开始到它返回的这段时间,当进程返回成功指示时,应用进程开始处理数据报。 非阻塞方式 当请求的I/O操作不能完成时,不让进程睡眠,而应返回一个错误。前三次调用recvfrom时仍无数据返回,因此内核立即返回一个错误。第四次调用 recvfrom时,数据报已准备好,被拷贝到应用缓冲区, recvfrom返回成功指示,接着处理数据。此过程称为轮询(polling)。这对CPU时间是极大的浪费。 I/O复用模型 调用select或poll,在这两个系统调用中的某一个上阻塞,而不是阻塞于真正I/O系统调用。 阻塞于select调用,等待数据报套接口可读。当select返回套接口可读条件时,调用recevfrom将数据报拷贝到应用缓冲区中。 信号驱动I/O模型 套接口启动信号驱动I/O, 并通过系统调用sigaction安装一个信号处理程序。此系统调用立即返回,进程继续工作,它是非阻塞的。 当数据报准备好被读时,就为该进程生成一个SIGIO信号。 随即可以在信号处理程序中调用recvfrom来读数据报,井通知主循环数据已准备好被处理中。也可以通知主循环,让它来读数据报。 异步I/O模型 让内核启动操作,并在整个操作完成后(包括将数据从内核拷贝到用户自己的缓冲区)通知用户。 信号驱动I/O:由内核通知我们何时可以启动一个I/O操作, 异步I/O模型:由内核通知我们I/O操作何时完成。 -------------------------------------------------------------------------------- select 函数 允许进程指示内核等待多个事件中的任一个发生,并仅在一个或多个事件发生或经过某指定的时间后才唤醒进程。 作为一个例子,我们可以调用函数select并通知内核仅在下列情况发生时才返回 集合{1,4,5}中的任何报述字准备好读 集合{2,7}的任何描述字准备好写 集合{1,4}中的任何描述字有异常条件待处理 已经过了10.2秒 通知内核我们对哪些描述字感兴趣(读、写或异常条件)以及等待多长时间。 描述字不受限于套接口:任何描述字(例如文件描述字)都可用select来测试。 select 定义 int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout); maxfdp1 : 描述字最大值 readset : 读描述字集 writeset : 写描述字集 exceptset : 异常条件的描述字集 timeout : 等待时间 readset, writeset和exceptset 让内核测试读、写和异常条件所需的描述字。 为这三个参数的每一个指定一个或多个描述字值 描述字集,是一个整数数组,每个数中的每一位对应一个描述字。 数组的第一个元素对应于描述字0-31, 数组户的第二个元素对应于描述字32—63。 例子: fd_set rset; //定义描述字集数据类型 FD_ZERO (&rset); //对描述字集初始化 FD_SET(1, &rset); //打开描述字的第1位 FD_SET(4, &rset) // //打开描述字的第4位 ...... FD_ISSET(4, &rest) //测试描述字的第4位 FD_CLR(4, &rset) // //关闭描述字的第4位 readset 套接口准备好读 套接口接收缓冲区中的数据字节数>= 套接口接收缓冲区低潮限度的当前值 连接的读这一半关闭(接收了FIN的TCP连接) 套接口是一个监听套接口旦已完成的连接数为非0。 有一个套接口错误待处理。 writeset 套接口准备好写 套接口发送缓冲区中的可用空间字节数大干等于套接口发送缓冲区低潮限度的 当前值,且或者(i)套接口已连接,或者(i)套接口不要求连接 连接的写这一半关闭。对这样的套接口的写操作将产生信SIGPIPE。 有一个套接口错误待处理。对这样的套接口的写操作将不阻塞且返回一个错误(一1) exceptset异常条件待处理 如果一个套接口存在带外数据或者仍处于带外标记,那它有异常条件待处理。 带外数据(out—of—band data),有时也称为加速数据(expedited data), 是指连接双方中的一方发生重要事情,想要迅速地通知对方。 这种通知在已经排队等待发送的任何“普通”(有时称为“带内”)数据之前发送。 带外数据设计为比普通数据有更高的优先级。 带外数据是映射到现有的连接中的,而不是在客户机和服务器间再用一个连接。 最大描述字 maxfdp1 当select刚开始设计时,操作系统常对每个进程可用的最大描述字数上限作出 限制(4.2BSD的限制为31),select也就用相同的限制值。 unix版本对每个进程的描述字数根本不作限制 (仅受限于内存量和管理性限制), #include #DEFINE FD_SETSIZE 256 -------------------------------------------------------------------------------- str_cli 函数的修订版 服务器进程一终止客户就能马上得到通知 早期版本的问题就在于当套接口上发生了某些事件时,客户可能阻塞于fgets调用, 新版本则阻塞于select调用:等待标准输入,等待套接口可读。 //旧的回射服务器客户端main程序 #include “unp.h“ //包含头文件 int main(int argc, char **argv) //argv是命令行的第二个参数 { int sockfd; //套接口描述字 struct sockaddr_in servaddr; //IPv4地址结构 if (argc != 2) //命令行要有第二个参数(服务器地址) err_quit("usage: tcpcli "); sockfd = Socket(AF_INET, SOCK_STREAM, 0); // bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(SERV_PORT); Inet_pton(AF_INET, argv[1], &servaddr.sin_addr); Connect(sockfd, (SA *) &servaddr, sizeof(servaddr)); str_cli(stdin, sockfd); /* do it all */ exit(0); } //str_cli #include "unp.h" void str_cli ( FILE *fp, int sockfd) { char sendline[MAXLINE], recvline[MAXLINE]; while (Fgets ( sendline, MAXLINE, fp) ! = NULL) { Writen( sockfd, sendline, strlen(sendline) ); if ( Readline ( sockfd, recvline, MAXLINE ) = = 0) err_quit("str_cli: server terminated prematurely"); Fputs(recvline, stdout); } } 对套接口的处理 对方TCP发送数据,套接口就变为可读且read返回大于0 对方TCP发送一个FIN(对方进程终止),套接口就变为可读且read返回0(文件结束)。 对方TCP发送一个RST (对方主机崩溃并重新启动),套接口变为可读且返回-1 #include "unp.h" void str_cli(FILE *fp, int sockfd) { int maxfdp1; //最大描述字 fd_set rset; //描述字集 char sendline[MAXLINE], recvline[MAXLINE]; FD_ZERO(&rset); //描述字集清零(空集) for ( ; ; ) { FD_SET(fileno(fp), &rset); //打开文件描述字的测试 FD_SET(sockfd, &rset); // //打开套接口描述字的测试 maxfdp1 = max(fileno(fp), sockfd) + 1; //获得最大描述字 Select(maxfdp1, &rset, NULL, NULL, NULL); //对是否可读进行测试 if (FD_ISSET(sockfd, &rset)) { //如果套接口可读 if (Readline(sockfd, recvline, MAXLINE) = = 0) //读入一行 err_quit(“str_cli: server termi. premat..”); //对方终止时退出 Fputs(recvline, stdout); //写到标准输出 } if (FD_ISSET(fileno(fp), &rset)) { //如果标准输入可读 if (Fgets(sendline, MAXLINE, fp) == NULL) //读入一行 return; // 遇到^D时退出子程序 Writen(sockfd, sendline, strlen(sendline)); //写入套接口 } } } -------------------------------------------------------------------------------- shutdown 函数 close有两个限制可由函数shutdown来避免: close将描述字的访问计数减1,仅在此计数为0时才关闭套接口 shutdown可激发TCP的正常连接终止序列, 而不管访问计数。 close终止了数据传送的两个方向:读和写。 shutdown终止的数据传送的两个方向:读和写, 或其中任一方向:读或写 定义: int shutdown( int sockfd, int howto) ; howto选项: SHUT_RD 关闭连接的读一半 SHUT_WR 关闭连接的写这一半 SHUT_RDWR 关闭连接读读和写 -------------------------------------------------------------------------------- str_cli函数(再修订) void str_cli(FILE *fp, int sockfd) { int maxfdp1, stdineof ; fd_set rset; char sendline[MAXLINE], recvline[MAXLINE]; stdineof=0; FD_ZERO(&rset); for ( ; ; ) { if (stdineof == 0) FD_SET(fileno(fp), &rset); FD_SET(sockfd, &rset); maxfdp1 = max(fileno(fp), sockfd) + 1; Select(maxfdp1, &rset, NULL, NULL, NULL); if (FD_ISSET(sockfd, &rset)) { /* socket is readable */ if (Readline(sockfd, recvline, MAXLINE) == 0) { if (stdineof == 1) return; /* normal termination */ else err_quit(“str_cli: server ERR. "); } Fputs(recvline, stdout); } if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */ if (Fgets(sendline, MAXLINE, fp) == NULL) { stdineof = 1; Shutdown(sockfd, SHUT_WR); /* send FIN */ FD_CLR(fileno(fp), &rset); continue; } Writen(sockfd, sendline, strlen(sendline)); } } } -------------------------------------------------------------------------------- TCP回射服务器程序(修订版) 使用select来处理任意数目的客户的单进程程序 不为每个客户派生一个子进程,避免了创建一个新进程的所有开销。 监听套接口 服务器只维护一个读描述字集 描述字0、1和2分别被设置为标准输入、标准输出和标准错误输出 监听套接口的第一个可用的描述字是3。 与第一个客户建立连接 监听描述字变为可读,于是服务器调用accept。 由accept返回的新的已连接描述字将是4。 第一个客户终止与服务器的连接 客户TCP发送一个FIN,这使得服务器中的描述字4变为可读。 当服务器读此已连接套接口时,readline返回0。 关闭此套接口并相应地更新数据结构,数组元素client[0]]的值置为一1, 描述字集中的描述字4被置为0,maxfd的值没有改变。 //源程序 int main(int argc, char **argv) { int i, maxi, maxfd, listenfd, connfd, sockfd; int nready, client[FD_SETSIZE]; ssize_t n; fd_set rset, allset; char line[MAXLINE]; socklen_t clilen; struct sockaddr_in cliaddr, servaddr; listenfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); Bind(listenfd, (SA *) &servaddr, sizeof(servaddr)); Listen(listenfd, LISTENQ); maxfd = listenfd; /* initialize */ maxi = -1; /* index into client[] array */ for (i = 0; i < FD_SETSIZE; i++) client[i] = -1; /* -1 indicates available entry */ FD_ZERO(&allset); FD_SET(listenfd, &allset); for ( ; ; ) { rset = allset; /* structure assignment */ nready = Select(maxfd+1, &rset, NULL, NULL, NULL); if (FD_ISSET(listenfd, &rset)) { /* new client connection */ clilen = sizeof(cliaddr); connfd = Accept(listenfd, (SA *) &cliaddr, &clilen); for (i = 0; i < FD_SETSIZE; i++) if (client[i] < 0) { client[i] = connfd; /* save descriptor */ break } if (i == FD_SETSIZE) err_quit("too many clients"); FD_SET(connfd, &allset); /* add new descriptor to set */ if (connfd > maxfd) maxfd = connfd; /* for select */ if (i > maxi) maxi = i; /* max index in client[] array */ if (--nready <= 0) continue; /* no more readable descriptors */ } for (i = 0; i <= maxi; i++) { /* check all clients for data */ if ( (sockfd = client[i]) < 0) continue; if (FD_ISSET(sockfd, &rset)) { if ( (n = Readline(sockfd, line, MAXLINE)) == 0) { /*4connection closed by client */ Close(sockfd); FD_CLR(sockfd, &allset); client[i] = -1; } else Writen(sockfd, line, n); if (--nready <= 0) break; /* no more readable descriptors */ } } } //end of for loop } //end of main -------------------------------------------------------------------------------- poll 函数 原型: int poll (struct pollfd *fdarray, unsigned long nfds, int timeout ) 第一个参数是指向结构数组第一个元素的指针: struct pollfd{ int fd; // descriptor short events // events of interest on fd short revents // events that occured on fd } 第二个参数是套接字个数,第三个参数是等待时间 -------------------------------------------------------------------------------- TCP回射服务器程序(再修订) 用poll而不是用select来重写回射服务器程序。 在select版本中,必须分配一个client数组以及一个名为rset的描述字集。 使用poll时, 必须分配一个poll结构的数组来维护客户信息,而不是分配另一个数组。 与select中处理数组client相同的方法, 处理此数组的fd成员,值一1表示条目未用, 否则即为描述字值。 传递给poll的pollfd结构数组中的任何N成员为负值的条目都是被忽略的。 //源程序: #include "unp.h" #include /* for OPEN_MAX */ int main(int argc, char **argv) { int i, maxi, listenfd, connfd, sockfd; int nready; ssize_t n; char line[MAXLINE]; socklen_t clilen; struct pollfd client[OPEN_MAX]; struct sockaddr_in cliaddr, servaddr; listenfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); Bind(listenfd, (SA *) &servaddr, sizeof(servaddr)); Listen(listenfd, LISTENQ); client[0].fd = listenfd; client[0].events = POLLRDNORM; for (i = 1; i < OPEN_MAX; i++) client[i].fd = -1; /* -1 indicates available entry */ maxi = 0; for ( ; ; ) { nready = Poll(client, maxi+1, INFTIM); /* new client connection */ if (client[0].revents & POLLRDNORM) { clilen = sizeof(cliaddr); connfd = Accept(listenfd, (SA *) &cliaddr, &clilen); for (i = 1; i < OPEN_MAX; i++) if (client[i].fd < 0) { client[i].fd = connfd; // save descriptor break; } if (i == OPEN_MAX) err_quit("too many clients"); client[i].events = POLLRDNORM; if (i > maxi) maxi = i; /* max index in client[] array */ if (--nready <= 0) continue; /* no more readable descriptors */ } for (i = 1; i <= maxi; i++) { /* check all clients for data */ if ( (sockfd = client[i].fd) < 0) continue; if (client[i].revents & (POLLRDNORM | POLLERR)) { if ( (n = readline(sockfd, line, MAXLINE)) < 0) { if (errno == ECONNRESET) { /*4connection reset by client */ Close(sockfd); client[i].fd = -1; } else err_sys("readline error"); } else if (n == 0) { /*4connection closed by client */ Close(sockfd); client[i].fd = -1; } else Writen(sockfd, line, n); if (--nready <= 0) break; /* no more readable descriptors */ } } } }
以上都是来自<<Unix编程编程>>
Par2:
http://blog.csdn.net/dreamtofly/archive/2007/04/12/1561586.aspx
2.1. 如何管理多个连接?“我想同时监控一个以上的文件描述符(fd)/连接(connection)/流(stream),应该怎么办?” 使用 select() 或 poll() 函数。 注 意:select() 在BSD中被引入,而poll()是SysV STREAM流控制的产物。因此,这里就有了平台移植上的考虑:纯粹的BSD系统可 能仍然缺少poll(),而早一些的SVR3系统中可能没有select(),尽管在SVR4中将其加入。目前两者都是POSIX. 1g标准,(译者 注:因此在Linux上两者都存在) select()和poll()本质上来讲做的是同一件事,只是完成的方法不一样。两者都通过检验一组文件描述符来检测是否有特定的时间将在上面发生并在一定的时间内等待其发生。 [重要事项:无论select()还是poll()都不对普通文件起很大效用,它们着重用于套接口(socket)、管道(pipe)、伪终端(pty)、终端设备(tty)和其他一些字符设备,但是这些操作都是系统相关(system-dependent)的。] 2.1.1. 我如何使用select()函数?select()函数的接口主要是建立在一种叫'fd_set'类型的基础上。它('fd_set') 是一组文件描述符(fd)的集合。由于fd_set类型的长度在不同平台上不同,因此应该用一组标准的宏定义来处理此类变量: fd_set set; FD_ZERO(&set); /* 将set清零 */ FD_SET(fd, &set); /* 将fd加入set */ FD_CLR(fd, &set); /* 将fd从set中清除 */ FD_ISSET(fd, &set); /* 如果fd在set中则真 */ 在 过去,一个fd_set通常只能包含少于等于32个文件描述符,因为fd_set其实只用了一个int的比特矢量来实现,在大多数情况下,检查 fd_set能包括任意值的文件描述符是系统的责任,但确定你的fd_set到底能放多少有时你应该检查/修改宏FD_SETSIZE的值。*这个值是系 统相关的*,同时检查你的系统中的select() 的man手册。有一些系统对多于1024个文件描述符的支持有问题。[译者注: Linux就是这样 的系统!你会发现sizeof(fd_set)的结果是128(*8 = FD_SETSIZE=1024) 尽管很少你会遇到这种情况。] select的基本接口十分简单: int select(int nfds, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout); 其中: nfds 需要检查的文件描述符个数,数值应该比是三组fd_set中最大数 更大,而不是实际文件描述符的总数。readset 用来检查可读性的一组文件描述符。writeset 用来检查可写性的一组文件描述符。exceptset 用来检查意外状态的文件描述符。(注:错误并不是意外状态)timeout NULL指针代表无限等待,否则是指向timeval结构的指针,代表最 长等待时间。(如果其中tv_sec和tv_usec都等于0, 则文件描述符 的状态不被影响,但函数并不挂起) 函数将返回响应操作的对应操作文件描述符的总数,且三组数据均在恰当位置被修改,只有响应操作的那一些没有修改。接着应该用FD_ISSET宏来查找返回的文件描述符组。 这里是一个简单的测试单个文件描述符可读性的例子: int isready(int fd) { int rc; fd_set fds; struct timeval tv; FD_ZERO(&fds); FD_SET(fd,&fds); tv.tv_sec = tv.tv_usec = 0; rc = select(fd+1, &fds, NULL, NULL, &tv); if (rc < 0) return -1; return FD_ISSET(fd,&fds) ? 1 : 0; } 当然如果我们把NULL指针作为fd_set传入的话,这就表示我们对这种操作的发生不感兴趣,但select() 还是会等待直到其发生或者超过等待时间。 [译 者注:在Linux中,timeout指的是程序在非sleep状态中度过的时间,而不是实际上过去的时间,这就会引起和非Linux平台移植上的时间不 等问题。移植问题还包括在System V风格中select()在函数退出前会把timeout设为未定义的 NULL状态,而在BSD中则不是这样, Linux在这点上遵从System V,因此在重复利用timeout指针问题上也应该注意。] 2.1.2. 我如何使用poll()?poll ()接受一个指向结构'struct pollfd'列表的指针,其中包括了你想测试的文件描述符和事件。事件由一个在结构中事件域的比特掩码确定。当前 的结构在调用后将被填写并在事件发生后返回。在SVR4(可能更早的一些版本)中的 "poll.h"文件中包含了用于确定事件的一些宏定义。事件的等待 时间精确到毫秒 (但令人困惑的是等待时间的类型却是int),当等待时间为0时,poll()函数立即返回,-1则使poll()一直挂起直到一个指定 事件发生。下面是pollfd的结构。 struct pollfd { int fd; /* 文件描述符 */ short events; /* 等待的事件 */ short revents; /* 实际发生了的事件 */ }; 于select()十分相似,当返回正值时,代表满足响应事件的文件描述符的个数,如果返回0则代表在规定事件内没有事件发生。如发现返回为负则应该立即查看 errno,因为这代表有错误发生。 如果没有事件发生,revents会被清空,所以你不必多此一举。 这里是一个例子 /* 检测两个文件描述符,分别为一般数据和高优先数据。如果事件发生 则用相关描述符和优先度调用函数handler(),无时间限制等待,直到 错误发生或描述符挂起。*/ #include <stdlib.h> #include <stdio.h> #include <sys/types.h> #include <stropts.h> #include <poll.h> #include <unistd.h> #include <errno.h> #include <string.h> #define NORMAL_DATA 1 #define HIPRI_DATA 2 int poll_two_normal(int fd1,int fd2) { struct pollfd poll_list[2]; int retval; poll_list[0].fd = fd1; poll_list[1].fd = fd2; poll_list[0].events = POLLIN|POLLPRI; poll_list[1].events = POLLIN|POLLPRI; while(1) { retval = poll(poll_list,(unsigned long)2,-1); /* retval 总是大于0或为-1,因为我们在阻塞中工作 */ if(retval < 0) { fprintf(stderr,"poll错误: %s\n",strerror(errno)); return -1; } if(((poll_list[0].revents&POLLHUP) == POLLHUP) || ((poll_list[0].revents&POLLERR) == POLLERR) || ((poll_list[0].revents&POLLNVAL) == POLLNVAL) || ((poll_list[1].revents&POLLHUP) == POLLHUP) || ((poll_list[1].revents&POLLERR) == POLLERR) || ((poll_list[1].revents&POLLNVAL) == POLLNVAL)) return 0; if((poll_list[0].revents&POLLIN) == POLLIN) handle(poll_list[0].fd,NORMAL_DATA); if((poll_list[0].revents&POLLPRI) == POLLPRI) handle(poll_list[0].fd,HIPRI_DATA); if((poll_list[1].revents&POLLIN) == POLLIN) handle(poll_list[1].fd,NORMAL_DATA); if((poll_list[1].revents&POLLPRI) == POLLPRI) handle(poll_list[1].fd,HIPRI_DATA); } } 2.1.3. 我是否可以同时使用SysV IPC和select()/poll()?*不能。* (除非在AIX上,因为它用一个无比奇怪的方法来实现这种组合) 一般来说,同时使用select()或poll()和SysV 消息队列会带来许多麻烦。SysV IPC的对象并不是用文件描述符来处理的,所以它们不能被传递给select()和 poll()。这里有几种解决方法,其粗暴程度各不相同: 完全放弃使用SysV IPC。 :-)用fork(),然后让子进程来处理SysV IPC,然后用管道或套接口和父进程 说话。父进程则使用select()。 同上,但让子进程用select(),然后和父亲用消息队列交流。 安排进程发送消息给你,在发送消息后再发送一个信号。*警告*:要做好 这个并不简单,非常容易写出会丢失消息或引起死锁的程序。 另外还有其他方法。 |
|
|