TaskCompletionSource<T> (.NET) 的 Python 等价物是什么?
What would be a Python equivalent of TaskCompletionSource<T> (.NET)?
我正在尝试实现以下伪代码(部分灵感来自 .NET 中 TaskCompletionSource<T>
可能实现的功能),其中 objective 用于等待特定事件收到继续执行(或抛出一个TimeoutError
):
from axel import Event
def _wait_for_provider_up(self, data_provider: Provider, ms_timeout: int) -> bool:
if provider.State == ProviderState.Connected: return True
if provider.State == ProviderState.Faulted: return False
taskCompletion = TaskCompletionSource<bool>()
def onStateChanged(sender, e: ProviderStateChangedEventArgs):
if e.State == ProviderState.Connected:
taskCompletion.TrySetResult(True)
elif e.State == ProviderState.Faulted:
taskCompletion.TrySetResult(False)
provider.StateChanged += onStateChanged
try:
if provider.State == ProviderState.Connected: return True
elif provider.State == ProviderState.Faulted: return False
if not taskCompletion.Task.Wait(ms_timeout) or not taskCompletion.Task.Result or provider.State != ProviderState.Connected:
return False
finally:
provider.StateChanged -= onStateChanged
return True
推荐的 pythonic(或 .NET-ish 但 python 兼容)实现该目标的方法是什么?
假设提供者是一个 python 线程。
您可以使用等待线程竞争或超时的 join(timeout) 方法。
join() 总是 returns None 所以要检查线程是否还活着你需要使用 isAlive()。
这是实现 taskCompletion
的一个选项
def taskCompletion(thread,timeout):
thread.join(timeout)
return isAlive()
我已尝试以 asyncio
方式重写您的代码,但请注意:函数 _wait_for_provider_up
已更改为 coroutine
:
from axel import Event
async def _wait_for_provider_up(self, data_provider: Provider, ms_timeout: int) -> bool:
if provider.State == ProviderState.Connected:
return True
if provider.State == ProviderState.Faulted:
return False
taskCompletion = asyncio.Future()
def onStateChanged(sender, e: ProviderStateChangedEventArgs):
if e.State == ProviderState.Connected:
taskCompletion.set_result(True)
elif e.State == ProviderState.Faulted:
taskCompletion.set_result(False)
provider.StateChanged += onStateChanged
try:
if provider.State == ProviderState.Connected:
return True
elif provider.State == ProviderState.Faulted:
return False
try:
result = await asyncio.wait_for(taskCompletion, timeout=ms_timeout/1000)
if not result or provider.State != ProviderState.Connected:
return False
except asyncio.TimeoutError:
return False
finally:
provider.StateChanged -= onStateChanged
return True
代码仍然不是很pythonic,但我希望你能明白我的意思。
python 等价于 asyncio.Future
。
import asyncio
source = asyncio.Future()
async def await_concat(x):
return x + await source
async def set_after_two_seconds():
print("READY")
await asyncio.sleep(2)
print("SET")
source.set_result("GO")
print(asyncio.get_event_loop().run_until_complete(asyncio.gather(
await_concat("PASS "),
set_after_two_seconds(),
)))
## prints
# READY
## (note: two second pause here)
# SET
# ['PASS GO', None]
我正在尝试实现以下伪代码(部分灵感来自 .NET 中 TaskCompletionSource<T>
可能实现的功能),其中 objective 用于等待特定事件收到继续执行(或抛出一个TimeoutError
):
from axel import Event
def _wait_for_provider_up(self, data_provider: Provider, ms_timeout: int) -> bool:
if provider.State == ProviderState.Connected: return True
if provider.State == ProviderState.Faulted: return False
taskCompletion = TaskCompletionSource<bool>()
def onStateChanged(sender, e: ProviderStateChangedEventArgs):
if e.State == ProviderState.Connected:
taskCompletion.TrySetResult(True)
elif e.State == ProviderState.Faulted:
taskCompletion.TrySetResult(False)
provider.StateChanged += onStateChanged
try:
if provider.State == ProviderState.Connected: return True
elif provider.State == ProviderState.Faulted: return False
if not taskCompletion.Task.Wait(ms_timeout) or not taskCompletion.Task.Result or provider.State != ProviderState.Connected:
return False
finally:
provider.StateChanged -= onStateChanged
return True
推荐的 pythonic(或 .NET-ish 但 python 兼容)实现该目标的方法是什么?
假设提供者是一个 python 线程。 您可以使用等待线程竞争或超时的 join(timeout) 方法。 join() 总是 returns None 所以要检查线程是否还活着你需要使用 isAlive()。 这是实现 taskCompletion
的一个选项def taskCompletion(thread,timeout):
thread.join(timeout)
return isAlive()
我已尝试以 asyncio
方式重写您的代码,但请注意:函数 _wait_for_provider_up
已更改为 coroutine
:
from axel import Event
async def _wait_for_provider_up(self, data_provider: Provider, ms_timeout: int) -> bool:
if provider.State == ProviderState.Connected:
return True
if provider.State == ProviderState.Faulted:
return False
taskCompletion = asyncio.Future()
def onStateChanged(sender, e: ProviderStateChangedEventArgs):
if e.State == ProviderState.Connected:
taskCompletion.set_result(True)
elif e.State == ProviderState.Faulted:
taskCompletion.set_result(False)
provider.StateChanged += onStateChanged
try:
if provider.State == ProviderState.Connected:
return True
elif provider.State == ProviderState.Faulted:
return False
try:
result = await asyncio.wait_for(taskCompletion, timeout=ms_timeout/1000)
if not result or provider.State != ProviderState.Connected:
return False
except asyncio.TimeoutError:
return False
finally:
provider.StateChanged -= onStateChanged
return True
代码仍然不是很pythonic,但我希望你能明白我的意思。
python 等价于 asyncio.Future
。
import asyncio
source = asyncio.Future()
async def await_concat(x):
return x + await source
async def set_after_two_seconds():
print("READY")
await asyncio.sleep(2)
print("SET")
source.set_result("GO")
print(asyncio.get_event_loop().run_until_complete(asyncio.gather(
await_concat("PASS "),
set_after_two_seconds(),
)))
## prints
# READY
## (note: two second pause here)
# SET
# ['PASS GO', None]