【什么是IOCP】
是WINDOWS系统的一个内核对象。通过此对象,应用程序可以获得异步IO的完成通知。
这里有几个角色:
角色1:异步IO请求者线程。简单的说,就是调用WSAxxx()函数(例如函数WSARecv,WSASend)的某个线程。
由于是“异步”的,当角色1线程看到WSAxxx()函数返回时,它并不能知道本次IO是否真的完成了。
注:当WSAxxx返回成功true时,实际已经读到或发送完数据了(同步的获得IO结果了)。
为了统一逻辑,我们还是要放到角色2线程中,统一处理IO结果。
角色2:异步IO完成事件处理线程。简单的说,就是调用GetQueuedCompletionStatus函数的线程。
角色1投递的某个异步IO请求M,角色2线程一定能获得M的处理结果(无非是IO成功或失败)
角色3:操作系统。负责角色1和角色2的沟通。OS接收角色1的所有异步IO请求。
OS处理(实际的IO读写)排队的很多异步IO请求。OS的程序员是很牛的,他们能最大化利用CPU和网络。
OS把所有IO结果放入{IOCP完成队列C}中。
OS能调度角色2线程的运行和睡眠,能控制角色2线程同时运行的线程个数。
角色2通过GetQueuedCompletionStatus函数,读取到{IOCP完成队列C}中完成的IO请求。
【需要创建几个角色2线程呢】
CreateIoCompletionPort()函数创建一个完成端口,其中有一个参数是NumberOfConcurrentThreads。
这个参数的含义是:程序员期望的同时运行的角色2线程数。0代表默认为本机器的CPU个数。
程序员可以创建任意数量的角色2线程。
例如:NumberOfConcurrentThreads设置为2,而实际创建6个角色2线程,或100个,或0个。
如何理解这两个数的差异呢?
OS努力维持NumberOfConcurrentThreads个线程并发的运行,即使我创建100个角色2线程。
如果{IOCP完成队列C}中排队等待处理的{IO结果项}很少,角色2线程能很快处理完,则实际可能只有1个角色2线程在工作,其他线程都在睡眠(即使NumberOfConcurrentThreads设置成100,也只有一个线程在工作)。
如果{IOCP完成队列C}中排队等待处理的{IO结果项}很多,角色2线程处理需要很多CPU时间,则实际可能会有很多角色2线程会被唤醒工作。当然前提是我实际创建了很多角色2线程。极端情况下,如果角色2线程都退出了,则{IOCP完成队列C}可能会被挤爆了。
为什么一般情况下,NumberOfConcurrentThreads设置为2,而实际创建6个角色2线程呢?
考虑到我们的角色2线程不只是CPU计算,它还可能去读写日志文件,调用Sleep,或访问某个Mutex对象(造成线程被调度为睡眠)。这样,OS会启用一些“后备军”角色2线程去处理{IOCP完成队列C}。所以实际创建6个角色2线程,有几个可能是后备军线程。如果我们的角色2线程是纯CPU密集计算型的(可能有少量的临界区访问,也不会轻易放弃CPU控制权),那么我们只需要实际创建角色2线程数=CPU个数,多创建了也没益处(但也没坏处,可能OS让他们一直都睡眠,做后备军)。
【异步读写如何控制字节数】
或曰,某个WSASend调用,在网络正常的情况下,{实际发送字节数}(简称T)就是{需要发送的字节数}(简称R)。我试验了一下,从1M的buff,2M的buff...当开到很大的buff时,终于出现T<R的时候。
如果我们的应用需要一次发送很大量的数据时,应该检查T是否小于R。当发送的字节数不足时,应该继续发送剩余的(未发送出去的)部分。
对于WSARecv接收数据,应接收多大的字节数呢?假如应用层协议规定,我们的数据长度不是固定的,这就是一个很棘手的问题。一般情况下,应用层协议规定,一段逻辑上是一组的数据,分包头部分和包体部分。包头是固定长度的,包体是变长的。包头含有如下信息:包体的长度字节数。我们先收一个固定长度的包头,从中解析出“包体长度信息”,然后我们再次发出一个WSARecv收包体。我称作这个方法为“包头包体两阶段接收法”。
【异步读写如何控制超时】
假如我们接受一个数据包,发出WSARecv{异步IO: X}。这个{异步IO: X}可能长时间无法获得结果。假如对方客户端恶意的不发送任何数据。IOCP本身机制不提供任何超时控制。只能我们程序员控制这个超时。我们发出一个WSARecv调用后,通过维护某种{数据结构: D},记住此时的时间。在未来的某个时间我们的程序要检查这个{数据结构: D}, 判断这个WSARecv调用是否有结果了。当然此{数据结构: D}的状态改变由{角色2线程}负责。
如果{角色2线程}通过GetQueuedCompletionStatus调用获得了{异步IO: X}的结果,则改变{数据结构: D}的状态。我们只要判断{数据结构: D}的某个状态未改变,则一定是这个{异步IO: X}未被完成(客户端没有发送任何数据)。
控制超时和控制字节数往往有关联。假如恶意的客户端只发送部分字节数,我们还要处理这种情况。
假如协议要求100个字节,客户端一次传来10个,我们可以毫不客气的干掉这个客户端。这个策略比较狠了些。我们需要温和一点的策略。可能因为网络原因,剩下的90个字节很快就能到来,我们可以继续在规定时间等接受剩余的90个字节。如果超时了,才把这个客户端干掉。
【IOCP系统资源耗尽的问题】
假如我们有10000个客户端socket连接,为了接收他们发送过来的数据,我们需要预先投递10000个WSARecv。
假如每个异步读需要应用层程序员提供10k的缓冲区,则一共需要的用户缓冲区为 10000*10k=97M 内存。windows要求这97M数据必须被OS“锁定”,意思大体是需要占用大量的OS的资源了。所以程序很可能会因为10000个客户同时连接,而耗尽资源。WSAENOBUF错误同此有关。
解决方法是投递0字节数请求的WSARecv。伪代码如下:
WSABUF DataBuf;
DataBuf.len=0;
DataBuf.buf=0;
WSARecv(socket, &DataBuf, 1,...);
当有数据到来时,这个异步IO会从角色2线程中得到结果。由于它是0字节的读,所以它没有触碰任何socket缓冲区的到来的任何数据。我们付出很小的成本(大约每个连接节省了10k)就能知道哪个客户端的数据到来了。别小看了每个连接节省了这么点资源,连接数大了节约的总量就很可观了。如果客户端数量很少,这个技巧就没什么意思了。
【优雅的杀死角色2线程】
PostQueuedCompletionStatus函数会向{IOCP完成队列C}中push进去一条记录。这样角色2线程就能获得这个“虚伪或模拟”的异步IO完成事件。为什么要“假冒”一条{IOCP完成队列C}的条目呢?用处吗,程序员自己去想吧(意思是用处多多了)。一般来说,我们用它“优雅的杀死角色2线程”。伪代码如下:
typedef struct
{
OVERLAPPED Overlapped;
OP_CODE op_type;
...
} PER_IO_DATA;
PER_IO_DATA* PerIOData = ...
PerIOData->op_type = OP_KILL; //操作类型是杀死线程
PostQueuedCompletionStatus(...PerIOData...);
//如果有N个角色2线程,则需要调用N次,这样{IOCP完成队列C}中才能有N个这个的条目。
角色2线程:
PER_IO_DATA* PerIOData=0;
GetQueuedCompletionStatus(...&PerIOData...);
if (PerIOData->op_type == OP_KILL){ return ; } //从线程中自然return,就是优雅的退出线程。
【大头的错误处理】
GetQueuedCompletionStatus函数的错误处理比较复杂。
1 如果GetQueuedCompletionStatus返回false:
1.1 如果Overlapped指针非空
恭喜你,你投递的异步IO获得结果了,只不过是失败的结果。好孬也终于回来个信儿了。
这可能是socket连接断了等等。
1.1.1 如果GetLastError获得的错误号为ERROR_OPERATION_ABORTED
一定是有东西调用了CancelIO(socket)了。所有同这个socket相关的异步IO请求都会被取消。
1.1.2 如果GetLastError 获得的错误号为其他的东西
可能是IO没成功,如socket连接断开了等等。
1.2 如果Overlapped指针空
这可不是好消息,因为这意味着IOCP本身有重大故障了。比如我们意外的把IOCP的句柄CloseHandle了。
1.2.1 如果GetLastError获得的错误号为WAIT_TIMEOUT
可能GetQueuedCompletionStatus设置的超时参数dwMilliseconds不是INFINITE。我们继续调用GetQueuedCompletionStatus重新等待吧。
1.2.1 如果GetLastError获得的错误号ERROR_ABANDONED_WAIT_0, 或者其他
IOCP本身都完蛋了,角色2线程应另找东家了,或者就地自我了断算了。
2 如果GetQueuedCompletionStatus返回true:
恭喜你,异步IO成功了。
通过lpNumberOfBytes, lpCompletionKey, and lpOverlapped这三个参数获得详细信息。
lpNumberOfBytes:实际传输的字节数。(可能比需要传输的字节数少)
lpCompletionKey:这就是著名的PerHandleData,可以知道这是哪个socket连接的。
lpOverlapped: 这就是著名的PER_IO_DATA, 同某次异步IO调用关联,
比如某次WSASend(Overlapped参数=0x123)调用,这里能重新拿到lpOverlapped==0x123。
我们可以根据这个指针,得知这个IO结果是对应着哪次WSASend()调用的结果。
我满以为这个错误处理天衣无缝,直到有一次测试。我对一个socke投递了100个WSARecv。当我故意把客户端关闭后,这些异步IO不出意外的都在角色2线程的GetQueuedCompletionStatus函数处获得结果了。令我吃惊的是,GetQueuedCompletionStatus返回为TRUE!!!,并且GetLastError()返回值是0!!!
令我欣慰的是lpNumberOfBytes值为0(否则真见鬼了)。所以看到GetQueuedCompletionStatus返回true,不要高兴的太早了。
2.1 把lpOverlapped指针解释成PER_IO_DATA数据结构。如果PerIOData->op_type == OP_KILL,可能这个是PostQueuedCompletionStatus伪造的一个IO完成事件。
2.2 判断是否(lpNumberOfBytes==0)。如果这个IO结果的确是某个WSAxxx()的结果,而不是PostQueuedCompletionStatus伪造的,则这个IO对应的socket可能断了。
2.3 (lpNumberOfBytes>0) ,这才是真正的IO完成的事件呢。可能99.9%的机会,分支跑到这里的。
【在同一个socket上一次投递多个异步IO】
一次投递多个WSASend(1234,&Buff1,...); WSASend(1234,&Buff2,...); ... 好像没问题。
如果一次投递多个WSARecv(1234,&Buff1,...);WSARecv(1234,&Buff2,...);好像有些需要阐明的问题。
第一:Windows保证按照你投递WSARecv的顺序,把网络上到达的数据按先后顺序放入Buff1,Buff2。
如果网络上到来的数据为 AAAAUUUU, 假设Buff1长度4,Buff2长度4,
则保证Buff1获得AAAA,Buff2获得UUUU
第二:如果有多个角色2线程,可能由于线程调度的“竞争条件race condition”,
某线程首先执行Buff2的完成处理过程。
如果我在角色2线程中,打印出收到的数据,可能打印出如下结果:UUUUAAAA。这绝不是违反了TCP协议, 而是多线程的问题。其实解决方案很简单。说者费事,上伪代码
typedef struct
{
OVERLAPPED Overlapped;
...
int Package_Number; //我对每一次IO,夹带本次调用顺序号
...
} PER_IO_DATA;
PER_IO_DATA* PerIOData1=...
PerIOData1->Package_Number = 1 ; //第一次调用
WSARecv(1234, &Buff1,...PerIOData1...);
PER_IO_DATA* PerIOData2=...
PerIOData1->Package_Number = 2 ; //第二次调用
WSARecv(1234, &Buff2,...PerIOData2...);
我们需要维护某种数据结构,记住我们发出了两个WSARecv。
当收到IO结果后,程序需要判断,只有1,2两个调用都从角色2线程获得结果后,才能按顺序把Buff1和Buff2拼接,就是符合顺序的AAAAUUUU。当然,还有其他更好的方式,这里只展示基本原理。
第三:真有必对同一个socket一次投递多个WSARecv吗?
这个问题同【IOCP系统资源耗尽的问题】,不矛盾。我们假设在投递多个WSARecv时,已经预见到网络上将到来某个socket的大量数据。 根据网络资料介绍,这样可以充分发挥多CPU并发运算的能力。我想在双核CPU机器上,一个CPU处理Buff1,同时另一个CPU处理Buff2。
如果是少量客户端连接,每个连接可能突然发生大量数据的传送,这个做法可能能加快从Socket缓冲区拷贝数据到应用程序Buff的速度(个人揣测)。
如果是大量客户端(10000)连接,每个连接传送的数据量很少,这个做法我个人认为没什么意义。我想CPU数量就2个,不会轻易就闲下来吧?
有一个重要原因,需要投递多个buffer给windows。假如我预计到某个socket一次传过来2M的数据,而
我没有2M大小的buffer,我只有1M大小的buffer。我需要先调用一次WSARecv,等待收完这1M数据后,再发一个
WSARecv。或者我用其他方法,提供给windows系统2个1M的buff。
第四:假设我们真需要一次投递多个Buff,接收数据,有必要用多次WSARecv调用吗?
这里有个可能的替代做法,上伪代码:
char *raw1 = new char[BUFF_SIZE];
WSABUF[2] wsabuf;
wsabuf[0].buf = raw1 ;
wsabuf[0].len = BUFF_SIZE;
char *raw2 = new char[BUFF_SIZE];
wsabuf[1].buf = raw2 ;
wsabuf[1].len = BUFF_SIZE;
WSARecv(1234, &wsabuf, 2 ... );
//重点在参数2上,指示了WSABUF结构体的个数是2个。一般大量IOCP的例子里这个参数都是1
这个方法我认为更简单,不知道是我自己“2”还是网上的其他人“2”,一次发出多个WSARecv,把这些分散的IO收集起来也是费事的事。UNIX系统的scatter-gather IO类似于这个机制。
|
|