如何使用可观察到的 RxPY 间隔定期调用异步协程?
How to call an async coroutine periodically using an RxPY interval observable?
我需要创建一个 Observable 流,它定期发出异步协程的结果。
intervalRead
是一个 returns Observable 的函数,参数是区间 rate
和一个异步协程函数 fun
,需要在定义的间隔。
我的第一个方法是用interval工厂方法创建一个observable,然后用map调用协程,用from_future把它包裹在一个Observable中,然后得到协程返回的值.
async def foo():
await asyncio.sleep(1)
return 42
def intervalRead(rate, fun) -> Observable:
loop = asyncio.get_event_loop()
return rx.interval(rate).pipe(
map(lambda i: rx.from_future(loop.create_task(fun()))),
)
async def main():
obs = intervalRead(5, foo)
obs.subscribe(
on_next= lambda item: print(item)
)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
然而我得到的输出不是协程的结果,而是from_future返回的Observable,在指定的时间间隔发出
输出:<rx.core.observable.observable.Observable object at 0x033B5650>
我怎样才能得到那个 Observable 返回的实际值?我希望 42
我的第二个方法是创建自定义可观察对象:
def intervalRead(rate, fun) -> rx.Observable:
interval = rx.interval(rate)
def subs(observer: Observer, scheduler = None):
loop = asyncio.get_event_loop()
def on_timer(i):
task = loop.create_task(fun())
from_future(task).subscribe(
on_next= lambda i: observer.on_next(i),
on_error= lambda e: observer.on_error(e),
on_completed= lambda: print('coro completed')
)
interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))
return rx.create(subs)
但是,订阅 from_future(task)
永远不会发出值,为什么会发生这种情况?
然而,如果我这样写 intervalRead
:
def intervalRead(rate, fun):
loop = asyncio.get_event_loop()
task = loop.create_task(fun())
return from_future(task)
我得到了预期的结果:42
。显然这并不能解决我的问题,但它让我感到困惑,为什么它在我的第二种方法中不起作用?
最后,我使用 rx.concurrency CurrentThreadScheduler
尝试了第三种方法,并使用 schedule_periodic
方法定期安排一个动作。然而,我面临着与第二种方法相同的问题。
def funWithScheduler(rate, fun):
loop = asyncio.get_event_loop()
scheduler = CurrentThreadScheduler()
subject = rx.subjects.Subject()
def action(param):
obs = rx.from_future(loop.create_task(fun())).subscribe(
on_next= lambda item: subject.on_next(item),
on_error= lambda e: print(f'error in action {e}'),
on_completed= lambda: print('action completed')
)
obs.dispose()
scheduler.schedule_periodic(rate,action)
return subject
如果能深入了解我所缺少的内容或任何其他完成我需要的建议,我将不胜感激。这是我使用 asyncio 和 RxPY 的第一个项目,我只在 angular 项目的上下文中使用 RxJS,因此欢迎任何帮助。
你的第一个例子几乎可以工作了。只需进行两项更改即可使其正常工作:
首先,from_future 的结果是一个发出单个项目的可观察对象(未来完成时的值)。所以 map 的输出是一个更高阶的 observable(一个发出 observable 的 observable)。这些子 observables 可以通过在 map 之后使用 merge_all 运算符,或者使用 flat_map 而不是 map.
来展平。
然后间隔运算符必须在 AsyncIO 循环上安排其计时器,默认情况下并非如此:默认调度程序是 TimeoutScheduler,它会生成一个新线程。所以在原来的代码中,任务不能调度到AsyncIO事件循环上,因为create_task是从另一个线程调用的。在调用 subscribe 时使用 scheduler 参数声明用于整个操作员链的默认调度程序。
以下代码有效(每 5 秒打印 42):
import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler
async def foo():
await asyncio.sleep(1)
return 42
def intervalRead(rate, fun) -> rx.Observable:
loop = asyncio.get_event_loop()
return rx.interval(rate).pipe(
ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
ops.merge_all()
)
async def main(loop):
obs = intervalRead(5, foo)
obs.subscribe(
on_next=lambda item: print(item),
scheduler=AsyncIOScheduler(loop)
)
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
我需要创建一个 Observable 流,它定期发出异步协程的结果。
intervalRead
是一个 returns Observable 的函数,参数是区间 rate
和一个异步协程函数 fun
,需要在定义的间隔。
我的第一个方法是用interval工厂方法创建一个observable,然后用map调用协程,用from_future把它包裹在一个Observable中,然后得到协程返回的值.
async def foo():
await asyncio.sleep(1)
return 42
def intervalRead(rate, fun) -> Observable:
loop = asyncio.get_event_loop()
return rx.interval(rate).pipe(
map(lambda i: rx.from_future(loop.create_task(fun()))),
)
async def main():
obs = intervalRead(5, foo)
obs.subscribe(
on_next= lambda item: print(item)
)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
然而我得到的输出不是协程的结果,而是from_future返回的Observable,在指定的时间间隔发出
输出:<rx.core.observable.observable.Observable object at 0x033B5650>
我怎样才能得到那个 Observable 返回的实际值?我希望 42
我的第二个方法是创建自定义可观察对象:
def intervalRead(rate, fun) -> rx.Observable:
interval = rx.interval(rate)
def subs(observer: Observer, scheduler = None):
loop = asyncio.get_event_loop()
def on_timer(i):
task = loop.create_task(fun())
from_future(task).subscribe(
on_next= lambda i: observer.on_next(i),
on_error= lambda e: observer.on_error(e),
on_completed= lambda: print('coro completed')
)
interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))
return rx.create(subs)
但是,订阅 from_future(task)
永远不会发出值,为什么会发生这种情况?
然而,如果我这样写 intervalRead
:
def intervalRead(rate, fun):
loop = asyncio.get_event_loop()
task = loop.create_task(fun())
return from_future(task)
我得到了预期的结果:42
。显然这并不能解决我的问题,但它让我感到困惑,为什么它在我的第二种方法中不起作用?
最后,我使用 rx.concurrency CurrentThreadScheduler
尝试了第三种方法,并使用 schedule_periodic
方法定期安排一个动作。然而,我面临着与第二种方法相同的问题。
def funWithScheduler(rate, fun):
loop = asyncio.get_event_loop()
scheduler = CurrentThreadScheduler()
subject = rx.subjects.Subject()
def action(param):
obs = rx.from_future(loop.create_task(fun())).subscribe(
on_next= lambda item: subject.on_next(item),
on_error= lambda e: print(f'error in action {e}'),
on_completed= lambda: print('action completed')
)
obs.dispose()
scheduler.schedule_periodic(rate,action)
return subject
如果能深入了解我所缺少的内容或任何其他完成我需要的建议,我将不胜感激。这是我使用 asyncio 和 RxPY 的第一个项目,我只在 angular 项目的上下文中使用 RxJS,因此欢迎任何帮助。
你的第一个例子几乎可以工作了。只需进行两项更改即可使其正常工作:
首先,from_future 的结果是一个发出单个项目的可观察对象(未来完成时的值)。所以 map 的输出是一个更高阶的 observable(一个发出 observable 的 observable)。这些子 observables 可以通过在 map 之后使用 merge_all 运算符,或者使用 flat_map 而不是 map.
来展平。然后间隔运算符必须在 AsyncIO 循环上安排其计时器,默认情况下并非如此:默认调度程序是 TimeoutScheduler,它会生成一个新线程。所以在原来的代码中,任务不能调度到AsyncIO事件循环上,因为create_task是从另一个线程调用的。在调用 subscribe 时使用 scheduler 参数声明用于整个操作员链的默认调度程序。
以下代码有效(每 5 秒打印 42):
import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler
async def foo():
await asyncio.sleep(1)
return 42
def intervalRead(rate, fun) -> rx.Observable:
loop = asyncio.get_event_loop()
return rx.interval(rate).pipe(
ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
ops.merge_all()
)
async def main(loop):
obs = intervalRead(5, foo)
obs.subscribe(
on_next=lambda item: print(item),
scheduler=AsyncIOScheduler(loop)
)
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()