Thread.Interrupt 相当于任务 TPL

Thread.Interrupt equivalent for Task TPL

一些背景:我的 C# 代码调用了一些执行阻塞等待的非托管代码 (C++)。然而,阻塞等待是可警示的(比如 Thread.Sleep - 我想它在掩护下调用 WaitForSingleObjectExbAlertable TRUE);我确定它是可警报的,因为它可以被 QueueUserAPC.

“唤醒”

如果我可以简单地使用托管线程,我会调用阻塞方法,然后在需要退出时使用Thread.Interrupt“唤醒”线程;像这样:

void ThreadFunc() {
    try {
           Message message;
           comObject.GetMessage(out message);
           //....
     }
     catch (ThreadInterruptedException) {
        // We need to exit
        return;
     }
}

var t - new Thread(ThreadFunc);
//....
t.Interrupt();

(注意:我没有使用此代码,但据我所知,它可以用于这种特殊情况(非托管代码中的警报等待超出我的控制)。我是什么在 TPL 中寻找的是最好的等效项(或 更好 替代品!)。

但我必须使用 TPL(任务而不是托管线程),非托管方法不受我控制(我无法修改它以调用 WaitForMultipleObjectEx 并使其成为 return 时例如,我发出事件信号)。

我正在寻找 Thread.Interrupt 任务的等价物(将 post 底层线程上的 APC)。 AFAIK,CancellationTokens 要求代码是“任务感知”的,并且不使用这种技术,但我不确定:发生了什么,我想知道,如果任务执行 Thread.Sleep(我知道有一个 Task.Wait,但它只是为了有一个非任务等待的例子,它是可警告的),它可以被取消吗?

我的假设是错误的吗(我的意思是,我可以只使用 CT 一切都会起作用吗?但是如何呢?)。

如果没有这样的方法...我愿意接受建议。我真的很想避免混合线程和任务,或使用 P/Invoke,但如果没有其他方法,我仍然希望以“最干净”的方式进行(这意味着:没有粗鲁的中止, 还有一些“Tasky”:) )

编辑:

对于那些好奇的人,我已经“确认”Thread.Interrupt 可以在我的情况下工作,因为它调用 QueueUserAPC。 它调用 InterruptInternal, then Thread::UserInterrupt, then Alert,使 APC 排队。它实际上非常聪明,因为它允许您 sleep/wait 然后唤醒一个线程,而无需使用另一个同步原语。

我只需要找到遵循相同流程的 TPL 原语

I wonder, if a task does a Thread.Sleep (I know there is a Task.Wait, but it's just for having an example of a non-task wait which is alertable), can it be cancelled?

不,不能。任务的取消由用户定义。它是合作取消,需要用户明确检查 CancellationToken

的状态

请注意 Task.Wait 的重载需要 CancellationToken:

/// <summary>
/// Waits for the task to complete, for a timeout to occur, 
/// or for cancellation to be requested.
/// The method first spins and then falls back to blocking on a new event.
/// </summary>
/// <param name="millisecondsTimeout">The timeout.</param>
/// <param name="cancellationToken">The token.</param>
/// <returns>true if the task is completed; otherwise, false.</returns>
private bool SpinThenBlockingWait(int millisecondsTimeout, 
                                  CancellationToken cancellationToken)
{
    bool infiniteWait = millisecondsTimeout == Timeout.Infinite;
    uint startTimeTicks = infiniteWait ? 0 : (uint)Environment.TickCount;
    bool returnValue = SpinWait(millisecondsTimeout);
    if (!returnValue)
    {
        var mres = new SetOnInvokeMres();
        try
        {
            AddCompletionAction(mres, addBeforeOthers: true);
            if (infiniteWait)
            {
                returnValue = mres.Wait(Timeout.Infinite,
                                        cancellationToken);
            }
            else
            {
                uint elapsedTimeTicks = ((uint)Environment.TickCount) -
                                               startTimeTicks;
                if (elapsedTimeTicks < millisecondsTimeout)
                {
                    returnValue = mres.Wait((int)(millisecondsTimeout -
                                             elapsedTimeTicks), cancellationToken);
                }
            }
        }
        finally
        {
            if (!IsCompleted) RemoveContinuation(mres);
            // Don't Dispose of the MRES, because the continuation off
            // of this task may still be running.  
            // This is ok, however, as we never access the MRES' WaitHandle,
            // and thus no finalizable resources are actually allocated.
        }
    }
    return returnValue;
}

它将尝试在特定条件下旋转线程。如果这还不够,它最终会调用 Monitor.Wait 这实际上会阻塞:

/*========================================================================
** Waits for notification from the object (via a Pulse/PulseAll). 
** timeout indicates how long to wait before the method returns.
** This method acquires the monitor waithandle for the object 
** If this thread holds the monitor lock for the object, it releases it. 
** On exit from the method, it obtains the monitor lock back. 
** If exitContext is true then the synchronization domain for the context 
** (if in a synchronized context) is exited before the wait and reacquired 
**
** Exceptions: ArgumentNullException if object is null.
========================================================================*/
[System.Security.SecurityCritical]  // auto-generated
[ResourceExposure(ResourceScope.None)]
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, Object obj);

目前,所有现有的生产 CLR 主机 implement one-to-one managed-to-unmanaged thread mapping。对于 Windows Desktop OS 系列尤其如此,您的遗留 COM 对象 运行s.

鉴于此,您可以使用 TPL 的 Task.Run 而不是经典的线程 API,并且仍然通过 p/invoke 调用 QueueUserAPC 以将您的 COM 对象从可变等待状态中释放,当取消令牌已被触发。

下面的代码展示了如何做到这一点。需要注意的一件事是,所有 ThreadPool 线程(包括由 Task.Run 启动的线程)在 COM MTA apartment. Thus, the COM object needs to support the MTA model without implicit COM marshaling. If it isn't the case, you'll probably need a custom task scheduler (like StaTaskScheduler 下隐式 运行)代替 Task.Run.

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        static int ComGetMessage()
        {
            NativeMethods.SleepEx(2000, true);
            return 42;
        }

        static int GetMessage(CancellationToken token)
        {
            var apcWasCalled = false;
            var gcHandle = default(GCHandle);
            var apcCallback = new NativeMethods.APCProc(target => 
            {
                apcWasCalled = true;
                gcHandle.Free();
            });

            var hCurThread = NativeMethods.GetCurrentThread();
            var hCurProcess = NativeMethods.GetCurrentProcess();
            IntPtr hThread;
            if (!NativeMethods.DuplicateHandle(
                hCurProcess, hCurThread, hCurProcess, out hThread,
                0, false, NativeMethods.DUPLICATE_SAME_ACCESS))
            {
                throw new System.ComponentModel.Win32Exception(Marshal.GetLastWin32Error());
            }
            try
            {
                int result;
                using (token.Register(() => 
                    {
                        gcHandle = GCHandle.Alloc(apcCallback);
                        NativeMethods.QueueUserAPC(apcCallback, hThread, UIntPtr.Zero);
                    },
                    useSynchronizationContext: false))
                {
                    result = ComGetMessage();
                }
                Trace.WriteLine(new { apcWasCalled });
                token.ThrowIfCancellationRequested();
                return result;
            }
            finally
            {
                NativeMethods.CloseHandle(hThread);
            }
        }

        static async Task TestAsync(int delay)
        {
            var cts = new CancellationTokenSource(delay);
            try
            {
                var result = await Task.Run(() => GetMessage(cts.Token));
                Console.WriteLine(new { result });
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Cancelled.");
            }
        }

        static void Main(string[] args)
        {
            TestAsync(3000).Wait();
            TestAsync(1000).Wait();
        }

        static class NativeMethods
        {
            public delegate void APCProc(UIntPtr dwParam);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint SleepEx(uint dwMilliseconds, bool bAlertable);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint QueueUserAPC(APCProc pfnAPC, IntPtr hThread, UIntPtr dwData);

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentThread();

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentProcess();

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool CloseHandle(IntPtr handle);

            public const uint DUPLICATE_SAME_ACCESS = 2;

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool DuplicateHandle(IntPtr hSourceProcessHandle,
               IntPtr hSourceHandle, IntPtr hTargetProcessHandle, out IntPtr lpTargetHandle,
               uint dwDesiredAccess, bool bInheritHandle, uint dwOptions);
        }
    }
}