在 Windows 中的另一个线程上异步启动和取消 I/O 的无竞争方式

Race-free way to asynchronously start AND cancel I/O on another thread in Windows

背景:一般来说,如果我们想强制一个操作异步发生(避免阻塞主线程),使用FILE_FLAG_OVERLAPPED是不够的,因为操作仍然可以同步完成。

因此,为了避免这种情况,我们将操作推迟到专用于 I/O 的工作线程。这样可以避免阻塞主线程。

现在主线程可以使用CancelIoEx(HANDLE, LPOVERLAPPED)取消worker发起的I/O操作(比如,通过ReadFile)。

然而,要CancelIoEx成功,主线程需要一种方法来保证操作实际上已经开始,否则没有什么可以取消的。

这里最明显的解决方案是让工作线程在调用之后设置一个事件,例如ReadFile returns,但这现在让我们回到最初的问题:因为 ReadFile 可以阻塞,我们首先就失去了拥有工作线程的全部目的,这是为了确保主线程不会在 I/O.

上被阻塞

解决这个问题的“正确”方法是什么?有没有一种好方法可以实际强制 I/O 操作异步发生,同时仍然能够请求它的当 I/O 尚未完成时,稍后以无竞争的方式取消?

我唯一能想到的就是设置一个计时器,在 I/O 还没有完成时定期调用 CancelIoEx,但这看起来非常丑陋。是否有 better/more 可靠的解决方案?

你通常需要做下一步:

  • 你用来异步的每个文件句柄I/O封装到 一些 c/c++ object(我们将其命名为 IO_OBJECT

  • 这个object需要有引用计数

  • 在开始异步操作之前 I/O - 你需要分配另一个 object,其中封装了OVERLAPPEDIO_STATUS_BLOCK(让名字 它 IO_IRP) 在 IO_IRP 中存储指向 IO_OBJECT 的引用指针 和特定的 io 信息 - I/O 代码(读、写等)缓冲区 指针,..

  • 检查I/O操作的return代码以确定,将是I/O 回调(排队到 iocp 或 apc 的数据包)或如果操作失败(将 无回调)- 仅使用错误代码

    自行调用回调
  • I/O 管理器保存您在 IRP 结构中传递给 I/O 的指针 (UserApcContext) 和 当 I/O 完成时将它传回给你(如果使用 win32 api 这个指针 如果本机 api 指向 OVERLAPPED 的相等指针 - 您可以直接通过 自己控制这个指针)

  • when I/O finishid(如果不是同步在开始时失败) - 回调 最终 I/O 状态将被调用

  • 这里你得到了指向 IO_IRP (OVERLAPPED) 的指针 - 的调用方法 IO_OBJECT 并释放引用,删除 IO_IRP

  • 如果你在某个时候可以提前关闭 object handle(不是在 析构函数) - 实现一些 run-down 保护,用于不访问 关闭后处理

  • run-down protection和weak-refefence很像,可惜没有 用户模式api,但不难自己实现

从任何线程,如果您有指向 object 的指针(当然是引用),您可以调用 CancelIoEx 或关闭 object 句柄 - 如果文件有 IOCP,最后一次文件句柄已关闭 - 所有 I/O 操作都将被取消。但是对于关闭 - 你不需要直接调用 CloseHandle 而是开始 run-down 并在 run-down 完成时调用 CloseHandle (在一些 ReleaseRundownProtection 调用中(这是演示名称,没有这样的api)

一些最小的典型实现:

class __declspec(novtable) IO_OBJECT 
{
    friend class IO_IRP;

    virtual void IOCompletionRoutine(
        ULONG IoCode, 
        ULONG dwErrorCode, 
        ULONG dwNumberOfBytesTransfered, 
        PVOID Pointer) = 0;
    
    void AddRef();
    void Release();

    HANDLE _hFile = 0;
    LONG _nRef = 1;
    //...
};


class IO_IRP : public OVERLAPPED 
{
    IO_OBJECT* _pObj;
    PVOID Pointer;
    ULONG _IoCode;
    
    IO_IRP(IO_OBJECT* pObj, ULONG IoCode, PVOID Pointer) : 
        _pObj(pObj), _IoCode(IoCode), Pointer(Pointer)
    {
        pObj->AddRef();
    }
    
    ~IO_IRP()
    {
        _pObj->Release();
    }
    
    VOID CALLBACK IOCompletionRoutine(
        ULONG dwErrorCode,
        ULONG dwNumberOfBytesTransfered,
        )
    {
        _pObj->IOCompletionRoutine(_IoCode, 
            dwErrorCode, dwNumberOfBytesTransfered, Pointer);

        delete this;
    }

    static VOID CALLBACK FileIOCompletionRoutine(
        ULONG status,
        ULONG dwNumberOfBytesTransfered,
        LPOVERLAPPED lpOverlapped
        )
    {
        static_cast<IO_IRP*>(lpOverlapped)->IOCompletionRoutine(
            RtlNtStatusToDosError(status), dwNumberOfBytesTransfered);
    }

    static BOOL BindIoCompletion(HANDLE hObject)
    {
        return BindIoCompletionCallback(hObject, FileIOCompletionRoutine, 0));
    }
    
    void CheckErrorCode(ULONG dwErrorCode)
    {
        switch (dwErrorCode)
        {
        case NOERROR:
        case ERROR_IO_PENDING:
            return ;
        }
        IOCompletionRoutine(dwErrorCode, 0);
    }
    
    void CheckError(BOOL fOk)
    {
        return CheckErrorCode(fOk ? NOERROR : GetLastError());
    }
};


///// start some I/O // no run-downprotection on file

if (IO_IRP* irp = new IO_IRP(this, 'some', 0))
{
    irp->CheckErrorCode(ReadFile(_hFile, buf, cb, 0, irp));
}

///// start some I/O // with run-downprotection on file

if (IO_IRP* irp = new IO_IRP(this, 'some', 0))
{
    ULONG dwError = ERROR_INVALID_HANDLE;
    
    if (AcquireRundownProtection())
    {
        dwError = ReadFile(_hFile, buf, cb, 0, irp) ? NOERROR : GetLastError();
        ReleaseRundownProtection();
    }
    
    irp->CheckErrorCode(dwError);
}

some更完整的实现


However, for CancelIoEx to succeed, the main thread needs a way to guarantee that the operation has in fact started, otherwise there is nothing to cancel.

是的,尽管您可以随时安全地调用 CancelIoEx,即使文件中没有活动的 I/O,事实上另一个线程可以在您调用之后开始新的 I/O 操作CancelIoEx。通过此调用,您可以取消当前已知的单次启动操作。例如 - 您开始连接 ConnectEx 并更新 UI(启用 取消 按钮)。当 ConnectEx 完成时 - 您 post 向 UI 发送消息(禁用 取消 按钮)。如果用户按 Cancel 直到 I/O (ConnectEx) ative - 你调用 CancelIoEx - 结果连接将被取消或提前正常完成。如果是周期性操作(例如 ReadFile 在循环中)——通常 CancelIoEx 不是停止此类循环的正确方法。相反,您需要从控制线程调用 CloseHandle - 这有效地取消了文件上的所有当前 I/O。


关于 ReadFile 和任何异步 I/O api 的工作方式以及我们是否可以从 api 调用中强制更快 return。

  1. I/O 管理员检查输入参数,转换句柄(文件句柄到 FILE_OBJECT) 指针,检查权限等,如果出现错误 这个阶段 - 错误 return 调用者和 I/O 完成
  2. I/O 经理电话 driver。 driver(或几个drivers - top driver可以 将请求传递给另一个) 处理 I/O 请求 (IRP) 最后 return 到 I/O 经理。它可以 return 或 STATUS_PENDING,这意味着 I/O 仍未完成或完成 I/O(调用 IofCompleteRequest) 和 return 另一种状态。任何状态 其他 比 STATUS_PENDING 意味着 I/O 完成(成功,错误 或取消,但完成)
  3. I/O mahager 检查 STATUS_PENDING 以及文件是否打开 synchronous I/O (flag FO_SYNCHRONOUS_IO ) 开始原地等待, 直到 I/O 完成。如果文件为异步打开 I/O - I/O 经理自己从不等待和呼叫者的 return 状态,包括 STATUS_PENDING

我们可以通过调用 CancelSynchronousIo 中断 3 阶段的等待。但是如果在 2 阶段等待在 driver 内 - 不可能以任何方式打破这个等待。任何 Cancel*Io*CloseHandle 在这里都无济于事。如果我们使用异步文件句柄 - I/O 管理器永远不会在 3 中等待,如果 api 调用等待 - 它会在 2[=159 中等待=](driver 处理程序)我们不能中断等待。

as resutl - 我们不能强制 I/O更快地调用异步文件return。 if driver 在某种情况下会等待。

等等 - 为什么我们不能中断 driver 等待,但可以停止 I/O 管理器等待。因为未知 - 如何,object(或只是睡眠),driver 等待的条件。如果我们在条件满足之前中断线程等待会怎样……所以如果 driver 等待 - 它将等待。如果 I/O 经理 - 他等待 IRP 补偿埃特。要打破这种等待 - 需要完整的 IRP。为此存在 api,它将 IRP 标记为已取消并调用 driver 回调(driver 必须设置此回调以防它 return 在完成请求之前)。 driver 在这个回调完成的 IRP 中,这是从等待中唤醒 I/O 管理器(再次它只等待同步文件)和 return 到调用者

也很重要不要混淆 - I/O 结束和 api 通话结束。如果是同步文件 - 这是相同的。 api return 仅在 I/O 完成后编辑。但对于异步 I/O 这是不同的事情 - I/O 仍然可以激活,在 api 调用是 return 之后(如果它 return STATUS_PENDINGERROR_IO_PENDING 用于 win32 层)。

我们可以通过取消它来要求 I/O 提前完成。通常(如果 driver 设计良好)这项工作。但是我们不能要求 api 尽早调用 return 以防异步 I/O 文件。我们无法控制 I/O 调用的时间和速度(在具体情况下为 ReadFile)return。但可以提前取消 I/O request after I/O call (ReadFile) return 。更确切地说,在 2 driver return 之后,因为 I/O 经理从不在 3 等待 - 可以说I/O 在 driver return 控制之后调用 return。


如果一个线程使用文件句柄,而另一个线程可以在没有任何同步的情况下关闭它——这当然会导致随机数和错误。在最好的情况下,ERROR_INVALID_HANDLE 可以从 api 调用 returned,在另一个线程关闭句柄之后。在最坏的情况下 - 句柄可以在关闭后重用,我们开始使用错误的句柄并产生未定义的结果。为了防止这种情况,只需要在 run-down 保护内部使用句柄(类似于 convert 对 strong 的弱引用)。 演示实现:

class IoObject
{
    HANDLE _hFile = INVALID_HANDLE_VALUE;
    LONG _lock = 0x80000000;

public:
    HANDLE LockHandle() 
    {
        LONG Value, PreviousValue;

        if (0 > (Value = _lock))
        {
            do 
            {
                PreviousValue = InterlockedCompareExchangeNoFence(&_lock, Value + 1, Value);

                if (PreviousValue == Value) return _hFile;

            } while (0 > (Value = PreviousValue));
        }
    
        return 0;
    }

    void UnlockHandle()
    {
        if (InterlockedDecrement(&_lock) == 0)
        {
            _hFile = 0; // CloseHandle(_hFile)
        }
    }

    void Close()
    {
        if (LockHandle())
        {
            _interlockedbittestandreset(&_lock, 31);
            UnlockHandle();
        }
    }

    void WrongClose()
    {
        _hFile = 0; // CloseHandle(_hFile)
    }

    BOOL IsHandleClosed()
    {
        return _hFile == 0;
    }
};

ULONG WINAPI WorkThread(IoObject* pObj)
{
    ULONG t = GetTickCount();
    int i = 0x1000000;
    do 
    {
        if (HANDLE hFile = pObj->LockHandle())
        {
            SwitchToThread(); // simulate delay

            if (pObj->IsHandleClosed())
            {
                __debugbreak();
            }

            pObj->UnlockHandle();
        }
        else
        {
            DbgPrint("[%x]: handle closed ! (%u ms)\n", GetCurrentThreadId(), GetTickCount() - t);
            break;
        }
    } while (--i);

    return 0;
}

ULONG WINAPI WorkThreadWrong(IoObject* pObj)
{
    ULONG t = GetTickCount();
    int i = 0x10000000;
    do 
    {
        if (pObj->IsHandleClosed())
        {
            DbgPrint("[%x]: handle closed ! (%u ms)\n", GetCurrentThreadId(), GetTickCount() - t);
            break;
        }
        
        SwitchToThread(); // simulate delay

        if (pObj->IsHandleClosed())
        {
            __debugbreak();
        }

    } while (--i);

    return 0;
}

void CloseTest()
{
    IoObject obj;

    ULONG n = 8;
    do 
    {
        if (HANDLE hThread = CreateThread(0, 0x1000, (PTHREAD_START_ROUTINE)WorkThread, &obj, 0, 0))
        {
            CloseHandle(hThread);
        }
    } while (--n);

    Sleep(50);
//#define _WRONG_
#ifdef _WRONG_
    obj.WrongClose();
#else
    obj.Close();
#endif
    MessageBoxW(0,0,0,0);
}

WrongClose(); 调用我们永久将在 WorkThread[Wrong] 中捕获 __debugbreak()(关闭后使用)。但是对于 obj.Close();WorkThread 我们绝不能捕获异常。另请注意 Close()lock-free 并且它的调用者永远不会 wait/hang 即使 api 在 rundown-protection 内调用将等待。