皇家称呼:在c#使用IOCP(完成端口)的简单示例

来源:百度文库 编辑:偶看新闻 时间:2024/05/05 15:40:40

蛙蛙推荐:在c#使用IOCP(完成端口)的简单示例

蛙蛙推荐:在c#使用IOCP(完成端口)的简单示例
上次给大家发了利用winsock原生的api来做一个同步的socket服务器的例子,大致上只是贴了一些代码,相信大家这么冰雪聪明,已经研究的差不多了。因为winsock的api使用在msdn或者google上都能很方便的查到,所以我没太多罗嗦代码的原理。但是c#进行平台调用方面是有一些经验的,单靠google和msdn及社区的力量有时候不容易得到答案。这次给大家演示一下利用IOCP的在线程间传递数据的例子,顺便打算讲一些细节和注意的地方。

概述:这里主要使用IOCP的三个API,CreateIoCompletionPort,PostQueuedCompletionStatus,GetQueuedCompletionStatus,第一个是用来创建一个完成端口对象,第二个是向一个端口发送数据,第三个是接受数据,基本上用着三个函数,就可以写一个使用IOCP的简单示例。

其中完成端口一个内核对象,所以创建的时候会耗费性能,CPU得切换到内核模式,而且一旦创建了内核对象,我们都要记着要不用的时候显式的释放它的句柄,释放非托管资源的最佳实践肯定是使用Dispose模式,这个博客园有人讲过N次了。而一般要获取一个内核对象的引用,最好用SafeHandle来引用它,这个类可以帮你管理引用计数,而且用它引用内核对象,代码更健壮,如果用指针引用内核对象,在创建成功内核对象并复制给指针这个时间段,如果抛了ThreadAbortException,这个内核对象就泄漏了,而用SafeHandle去应用内核对象就不会在赋值的时候发生ThreadAbortException。另外SafeHandle类继承自CriticalFinalizerObject类,并实现了IDispose接口,CLR对CriticalFinalizerObject及其子类有特殊照顾,比如说在编译的时候优先编译,在调用非CriticalFinalizerObject类的Finalize方法后再调用CriticalFinalizerObject类的Finalize类的Finalize方法等。在win32里,一般一个句柄是-1或者0的时候表示这个句柄是无效的,所以.net有一个SafeHandle的派生类SafeHandleZeroOrMinusOneIsInvalid ,但是这个类是一个抽象类,你要引用自己使用的内核对象或者非托管对象,要从这个类派生一个类并重写Relseas方法。另外在.net框架里它有两个实现几乎一模一样的子类,一个是SafeFileHandle一个是SafeWaitHandle,前者表示文件句柄,后者表示等待句柄,我们这里为了方便就直接用SafeFileHandle来引用完成端口对象了。

CreateIoCompletionPort函数的原型如下
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);

FileHandle参数表示要绑定在完成端口上的句柄,比如说一个已经accept的socket句柄。

ExistingCompletionPort参数表示一个已有的完成端口句柄,第一次创建完成端口的时候显然随便传个值就行,所以这个参数直接定义成IntPtr类型了。当你创建了工作线程来为I/O请求服务的时候,才会把句柄和完成端口关联在一起,而之前第一次创建完成端口的时候这个参数传一个zero指针就O了,而FileHandle参数传一个-1的指针就行了。

CompletionKey是完成键的意思,它可以是任意想传递给工作线程的数据,学名叫做单句柄数据,就是说跟随FileHandle参数走的一些状态数据,一般在socket的iocp程序里是把socket传进去,以便在工作线程里拿到这个socket句柄,在收到异步操作完成的通知及处理后继续进行下一个异步操作的投递,如发送和接受数据等。

NumberOfConcurrentThreads参数表示在一个完成端口上同时允许执行的最大线程数量。如果传0,就是说你有几个CPU,就是允许最大有几个线程,这也是最理想情况,因为一个CPU一个线程可以防止线程上下文切换。关于这个值要和创建工作线程的数量的关系,大家要理解清楚,不一定CPU有多少个,你的工作线程就创建多少个。因为你的工作线程有时候会阻塞或者等待,而如果你正好创建了CPU个数个工作线程,有一个等待的话,因为你分配了同时最多有CPU个数多个最大IOCP线程,这时候就不能效率最大化了。所以一般工作线程创建的要比CPU个数多一些,除非你保证你的工作线程不会阻塞。

PostQueuedCompletionStatus函数原型如下
 [DllImport("Kernel32", CharSet = CharSet.Auto)]
    private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
该方法用于给完成端口投递自定义信息,一般情况下如果把某个句柄和完成端口绑定后,当有数据收发操作完成时会自动同时工作线程,工作线程里的GetQueuedCompletionStatus就不会阻塞,而继续往下走,来进行接收到IO操作完成通知的流程。而有时候我们需要手工向工作者线程投递一些消息,比如说我们主线程知道所有的socket句柄都关闭了,工作线程可以退出了,我们就可以给工作线程发一个自定义数据,工作线程收到后判断是否是退出指令,然后退出。

CompletionPort参数表示向哪个完成端口对象投递信息,在这个完成端口上等待消息的工作线程就会收到消息了。
dwNumberOfBytesTransferred表示你投递的数据有多大,我们一般投递的是一个对象的指针,在32位系统里,int指针就是4个字节了,直接写4就O了,要不就用sizeof你传的数据,如sizeof(IntPtr)。

dwCompletionKey同CreateIoCompletionPort的解释,是单句柄数据,本示例用不到,不细说,直接用IntPtr.Zero填充了事。

lpOverlapped参数,本意是一个win32的overlapped结构的指针,本示例中不用,所以不详细讲。它叫单IO数据,是相对单据并拘束CompletionKey来讲的,前者是一个句柄的每次IO操作的上下文,比如单词IO操作的数据、操作类型等,后者是整个句柄的上下文。但这里我们表示你要投递的数据,可以是任何类型的数据(谁让它是个指针呢,所以传啥都行),值得注意的一点就是,这个数据传递到工作线程的时候,中间这个数据走的是非托管代码。所以不能直接传一个引用进去,这里要使用到GCHandle类。先大致介绍一下这个类吧。它有个静态方法Alloc来给把一个对象在GC句柄表里注册,GC句柄表示CLR为没个应用程序域提供的一个表,它允许你来监视和管理对象的生命周期,你可以往里加一个对象的引用,也可以从里面移除一个对象,往里加对象的时候,还可以指定一个标记来表示我们希望如何监视和控制这个对象。而加入一个条目的操作就是GCHandle的Alloc对象,它有两个参数,第一个参数是对象,第二参数是GCHandleType类型的枚举,第二个参数表示我们如何来监视和控制这个对象的生命周期。当这个参数是GCHandleType.Normal时,表示我们告诉垃圾收集器,及时托管代码里没有该对象的根,也不要回收该对象,但垃圾收集器可以移动它,一般我们向非托管代码传递一个对象,而又从非托管代码传递回来的时候用这个类型非常好,它不会让垃圾收集器在非托管代码返回托管代码的时候回收掉该对象,还不怎么影响GC的性能,因为GC还可以移动它。dwCompletionKey就是我们在托管-非托管-托管之间传递的一个很典型的场景。所以这里用它,另外还有GCHandleType.Pinned,它和GCHandleType.Normal不同的一点就是GC除了在没有根的时候不能回收这个对象外,还不能移动它,应用场景是给非托管代码传递一个byte[]的buffer,让托管代码去填充,如果用GCHandleType.Normal有可能在非托管代码返回托管代码的时候写错内存位置,因为有可能GC移动了这个对象的内存地址。关于根、GC原理,大家可以参考相关资料。另外在你的数据从非托管代码传递会托管代码后,要调用GCHandle的实例方法free来在GC句柄表里移除该对象,这时候你的托管代码还有个该对象的引用,也就是根,GC也不会给你回收的,当你用完了后,GC就给你回收了。GCHandle的Target属性用来访问GCHandle指向的对象。其它两个GCHandleType的成员是关于弱引用的,和本文关系不大,就不介绍了。

GetQueuedCompletionStatus原型如下
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
  public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
      out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
      out IntPtr lpOverlapped, uint dwMilliseconds);
前几个参数和PostQueuedCompletionStatus差不多,
CompletionPort表示在哪个完成端口上等待PostQueuedCompletionStatus发来的消息,或者IO操作完成的通知,

lpNumberOfBytesTransferred表示收到数据的大小,这个大小不是说CompletionKey的大小,而是在单次I/O操作完成后(WSASend或者WSAReceve),实际传输的字节数,我在这里理解的不是很透彻,我觉得如果是接受PostQueuedCompletionStatus的消息的话,应该是收到lpOverlapped的大小,因为它才是单IO数据嘛。

lpCompletionKey用来接收单据并数据,我们没传递啥,后来也没用,在socket程序里,一般接socket句柄。

lpOverlapped用来接收单IO数据,或者我们的自定义消息。

dwMilliseconds表示等待一个自定义消息或者IO完成通知消息在完成端口上出现的时间,传递INIFINITE(0xffffffff)表示无限等待下去。

好了,API大概介绍这么多,下面介绍代码
1、主线程创建一个完成端口对象,不和任何句柄绑定,前几个参数都写0,NumberOfConcurrentThreads参数我们写1,因为我们的示例就一个工作线程。
2、创建一个工作线程,把第一步创建的完成端口传进去
3、创建两个单IO数据,分别发投递给第一步创建的完成端口
4、在工作线程里执行一个死循环,循环在传递进来的完成端口上等待消息,没有消息的时候GetQueuedCompletionStatus处于休息状态,有消息来的时候把指针转换成对象,然后输出
5、如果收到退出指令,就退出循环,从而结束工作者线程。
下面是完整代码,需要打开不安全代码的编译选项。

using System;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;

[StructLayout(LayoutKind.Sequential)]
class PER_IO_DATA
{
    public string Data;
}

public class IOCPApiTest
{
    [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
    public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
    [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
    public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
        out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
        out IntPtr lpOverlapped, uint dwMilliseconds);
    [DllImport("Kernel32", CharSet = CharSet.Auto)]
    private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);

    public static unsafe void TestIOCPApi()
    {
        var CompletionPort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);
        if(CompletionPort.IsInvalid)
        {
            Console.WriteLine("CreateIoCompletionPort 出错:{0}",Marshal.GetLastWin32Error());
        }
        var thread = new Thread(ThreadProc);
        thread.Start(CompletionPort);

        var PerIOData = new PER_IO_DATA() ;
        var gch = GCHandle.Alloc(PerIOData);
        PerIOData.Data = "hi,我是蛙蛙王子,你是谁?";
        Console.WriteLine("{0}-主线程发送数据",Thread.CurrentThread.GetHashCode());
        PostQueuedCompletionStatus(CompletionPort, (uint)sizeof(IntPtr), IntPtr.Zero, (IntPtr)gch);

        var PerIOData2 = new PER_IO_DATA();
        var gch2 = GCHandle.Alloc(PerIOData2);
        PerIOData2.Data = "关闭工作线程吧";
        Console.WriteLine("{0}-主线程发送数据", Thread.CurrentThread.GetHashCode());
        PostQueuedCompletionStatus(CompletionPort, 4, IntPtr.Zero, (IntPtr)gch2);
        Console.WriteLine("主线程执行完毕");
        Console.ReadKey();
    }
    static void ThreadProc(object CompletionPortID)
    {
        var CompletionPort = (SafeFileHandle)CompletionPortID;

        while (true)
        {
            uint BytesTransferred;
            IntPtr PerHandleData;
            IntPtr lpOverlapped;
            Console.WriteLine("{0}-工作线程准备接受数据",Thread.CurrentThread.GetHashCode());
            GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,
                                      out PerHandleData, out lpOverlapped, 0xffffffff);
            if(BytesTransferred <= 0)
                continue;
            GCHandle gch = GCHandle.FromIntPtr(lpOverlapped);
            var per_HANDLE_DATA = (PER_IO_DATA)gch.Target;
            Console.WriteLine("{0}-工作线程收到数据:{1}", Thread.CurrentThread.GetHashCode(), per_HANDLE_DATA.Data);
            gch.Free();
            if (per_HANDLE_DATA.Data != "关闭工作线程吧") continue;
            Console.WriteLine("收到退出指令,正在退出");
            CompletionPort.Dispose();
            break;
        }
    }

    public static int Main(String[] args)
    {
        TestIOCPApi();
        return 0;
    }
}




--------------------------------------------------------------


#22楼2011-03-10 13:04 | Parky       蛙蛙的这篇是google第一篇耶. 贴个我写的IOCP包装类, 应该效率比较高了, 而且不需要unsafe.
用法很简单:
123456789101112131415161718static void Main(string[] args){    IOCP<string> testIOCP = new IOCP<string>(2, IOCPThreadFunction);    Console.ReadKey();    for (int i = 0; i < 100; i++)    {        testIOCP.PostEvent("Tester" + i);    }    Console.ReadKey();    testIOCP.Dispose();    Console.WriteLine("IOCP closed.");    Console.ReadKey();} static public void IOCPThreadFunction(string iValue){    Console.WriteLine(iValue);}

IOCP的代码如下:
汗竟然说我评论太长, 下一个再贴
 回复 引用 查看    #23楼2011-03-10 13:07 | Parky       这是IOCP的源代码
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110/// /// Win32 IOCP Wrapper with mannual post event./// /// Type of which be posted to the IOCPpublic class IOCP : IDisposable{    private IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1);    private IntPtr SHUTDOWN_IOCPTHREAD = IntPtr.Zero;    private const UInt32 INIFINITE = 0xffffffff;     IntPtr _iocpHandle;    bool _disposing = false;    Action _userFunction;    ManualResetEvent[] _waithandles;    uint _concurentNumber;     ///     /// Create an IOCP    ///     /// Concurrent thread number    /// User-function to process a dispatched item    public IOCP(uint concurrentNumber, Action userFucntion)    {        _concurentNumber = concurrentNumber;        _userFunction = userFucntion;         // Create an IO Completion Port        _iocpHandle = Win32IOCPWrapper.CreateIoCompletionPort(INVALID_HANDLE_VALUE, IntPtr.Zero, 0, concurrentNumber);         if (_iocpHandle == IntPtr.Zero)        {            throw new Exception("Unable to create IOCP: Error " + Marshal.GetLastWin32Error());        }         List handles = new List();        for (int i = 0; i < concurrentNumber; i++)        {            Thread trd = new Thread(IOCPThread);            trd.Name = "IOCP Thread " + trd.ManagedThreadId;            ManualResetEvent waitHandle = new ManualResetEvent(false);            handles.Add(waitHandle);            trd.Start(waitHandle);        }        _waithandles = handles.ToArray();    }        void IOCPThread(object obj)    {        IntPtr pOverlapped = new IntPtr();        uint uiNumberOfBytes = 0;        ulong iValue = 0;        ManualResetEvent waitHandle = obj as ManualResetEvent;         while (true)        {            bool succ = Win32IOCPWrapper.GetQueuedCompletionStatus(_iocpHandle, ref uiNumberOfBytes, ref iValue, ref pOverlapped, INIFINITE);            if (!succ)            {                throw new Exception("GetQueuedCompletionStatus Error: Code " + Marshal.GetLastWin32Error());            }            if (iValue == 0)                break;            GCHandle handle = GCHandle.FromIntPtr(new IntPtr((long)iValue));            T item = (T)handle.Target;            handle.Free();            _userFunction(item);        }        waitHandle.Set();    }     ///     /// Post an item to the IOCP    ///     /// Item to post    public void PostEvent(T item)    {        if (_disposing)        {            return;        }        GCHandle handle = GCHandle.Alloc(item, GCHandleType.Normal);        if (!Win32IOCPWrapper.PostQueuedCompletionStatus(_iocpHandle, (uint)IntPtr.Size, GCHandle.ToIntPtr(handle).ToInt64(), null))        {            throw new Exception("PostQueuedCompletionStatus Error: Code " + Marshal.GetLastWin32Error());        }    }     #region IDisposable 成员    ///     /// Close the IOCP    ///     public void Dispose()    {        if (_disposing)        {            return;        }        _disposing = true;        for (int i = 0; i < _concurentNumber; i++)        {            Win32IOCPWrapper.PostQueuedCompletionStatus(_iocpHandle, 4, 0L, null);        }        WaitHandle.WaitAll(_waithandles);        Win32IOCPWrapper.CloseHandle(_iocpHandle);    }    #endregion}
 回复 引用 查看    #24楼2011-03-10 13:09 | Parky       这是Win32IOCPWrapper:
123456789101112131415161718192021class Win32IOCPWrapper{    // Win32 Function Prototypes    /// Win32Func: Create an IO Completion Port Thread Pool     [DllImport("Kernel32", CharSet = CharSet.Auto)]    public static extern IntPtr CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, UInt64 puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);     /// Win32Func: Closes an IO Completion Port Thread Pool     [DllImport("Kernel32", CharSet = CharSet.Auto)]    public static extern Boolean CloseHandle(IntPtr hObject);     /// Win32Func: Posts a pointer into an IO Completion Port Thread Pool     [DllImport("Kernel32", CharSet = CharSet.Auto)]    public static extern Boolean PostQueuedCompletionStatus(IntPtr hCompletionPort, UInt32 SizeOfArgument, Int64 UserArg, object useless);     /// Win32Func: Waits on a pointer from an IO Completion Port Thread Pool.    ///           All threads in the pool wait in this Win32 Function     [DllImport("Kernel32", CharSet = CharSet.Auto)]    public static extern Boolean GetQueuedCompletionStatus(IntPtr hCompletionPort, ref UInt32 pSizeOfArgument, ref UInt64 pUserArg, ref IntPtr useless, UInt32 Milliseconds); }
 回复 引用 查看    #25楼[楼主]2011-03-10 13:49 | 蛙蛙王子       好,谢谢你,我看看
 回复 引用 查看    #26楼2011-03-10 14:29 | Parky       更改一下, GCHandle.Alloc()里, Alloc类型改为GCHandleType.Normal, 这样T可以是任意类型了. 否则只能pin基元类型

http://www.cnblogs.com/onlytiancai/archive/2008/07/12/iocp_demo_in_csharp.html