从 python 函数调用到异步方法的实时标准输出重定向
Real time stdout redirect from a python function call to an async method
所以我有一个非常耗时的函数调用 my_heavy_function
我需要将该输出重定向到调用它的 Web 界面,我有一个方法可以将消息发送到 Web 界面,让我们调用该方法 async push_message_to_user()
.
基本上是这样的
import time
def my_heavy_function():
time_on = 0
for i in range(20):
time.sleep(1)
print(f'time spend {time_on}')
time_on = time_on+1
async def push_message_to_user(message:str):
# some lib implementation
pass
if __name__ == "__main__":
my_heavy_function() # how push prints to the UI ?
也许有一种方法可以给出 my_heavy_function(stdout_obj)
并使用那个“std_object”(StringIO) 来做类似 stdout_object.push(f'time spend {time_on}')
的事情。我可以做到这一点,但我无法通过异步版本更改 my_heavy_function()
,直接添加 push_message_to_user()
而不是 print
(它被其他非 ascyn 例程使用)
我想要的是(伪代码)
with contextlib.redirect_output(my_prints):
my_heavy_function()
while my_prints.readable():
# realtime push
await push_message_to_user(my_prints)
谢谢!
感谢@HTF
的评论我终于设法解决了janus的问题。我复制了 repo 的示例,并进行了修改以接收可变数量的消息(因为我不知道 my_heavy_function()
将使用多少次迭代)
import asyncio
import janus
import time
def my_heavy_function(sync_q):
for i in range(10):
sync_q.put(i)
time.sleep(1)
sync_q.put('end') # is there a more elegant way to do this ?
sync_q.join()
async def async_coro(async_q):
while True:
val = await async_q.get()
print(f'arrived {val}')
# send val to UI
# await push_message_to_user(val)
async_q.task_done()
if val == 'end':
break
async def main():
queue = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, my_heavy_function, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
asyncio.run(main())
所以我有一个非常耗时的函数调用 my_heavy_function
我需要将该输出重定向到调用它的 Web 界面,我有一个方法可以将消息发送到 Web 界面,让我们调用该方法 async push_message_to_user()
.
基本上是这样的
import time
def my_heavy_function():
time_on = 0
for i in range(20):
time.sleep(1)
print(f'time spend {time_on}')
time_on = time_on+1
async def push_message_to_user(message:str):
# some lib implementation
pass
if __name__ == "__main__":
my_heavy_function() # how push prints to the UI ?
也许有一种方法可以给出 my_heavy_function(stdout_obj)
并使用那个“std_object”(StringIO) 来做类似 stdout_object.push(f'time spend {time_on}')
的事情。我可以做到这一点,但我无法通过异步版本更改 my_heavy_function()
,直接添加 push_message_to_user()
而不是 print
(它被其他非 ascyn 例程使用)
我想要的是(伪代码)
with contextlib.redirect_output(my_prints):
my_heavy_function()
while my_prints.readable():
# realtime push
await push_message_to_user(my_prints)
谢谢!
感谢@HTF
的评论我终于设法解决了janus的问题。我复制了 repo 的示例,并进行了修改以接收可变数量的消息(因为我不知道 my_heavy_function()
将使用多少次迭代)
import asyncio
import janus
import time
def my_heavy_function(sync_q):
for i in range(10):
sync_q.put(i)
time.sleep(1)
sync_q.put('end') # is there a more elegant way to do this ?
sync_q.join()
async def async_coro(async_q):
while True:
val = await async_q.get()
print(f'arrived {val}')
# send val to UI
# await push_message_to_user(val)
async_q.task_done()
if val == 'end':
break
async def main():
queue = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, my_heavy_function, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
asyncio.run(main())