未来在未来永远悬而未决
Future inside future always pending
P.S。开始了一个问题 https://github.com/robinhood/faust/issues/702
正在开发 Faust-应用程序:
from concurrent.futures import ProcessPoolExecutor, as_completed
import faust
app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')
@app.task()
async def check():
# 3 is amount of different folders where archives are laced
with ProcessPoolExecutor(max_workers=3) as executor:
fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
for future in as_completed(fs):
future.result()
def handle(directory):
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
f = sink.send_soon(value={'ts': 1234567890, 'count': 10}) # always in pending status
当方法 sink.send_soon returns FutureMessage(asyncio.Future, Awaitable[RecordMetadata]) 始终处于挂起状态。
这是future in another future的情况。
注意。函数 handle 应该是同步的,因为不能将异步函数传递给 ProcessPollExecutor。方法send_soon是同步方法。根据这个例子https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47 waiting 不一定。
有没有办法处理未决的未来?
也试过这个:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import faust
loop = asyncio.get_event_loop()
app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')
@app.task()
async def check():
tasks = []
with ProcessPoolExecutor(max_workers=3) as executor:
for dir_ in ['dir1', 'dir2', 'dir3']:
task = asyncio.create_task(run_dir_handling(executor, dir_))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_dir_handling(executor, dir_):
print('running blocking')
await loop.run_in_executor(executor, handle, dir_)
def handle(directory):
print('Handle directory')
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
# `send_soon` is not non-`async def but `send` is async
# async `soon` cannot be implemented because of
# `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
print(f) # always <FutureMessage pending>
但是也没用。
看来循环连运行send_soon方法都没有机会了
为此更改了代码结构:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import faust
loop = asyncio.get_event_loop()
app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic1')
@app.task()
async def check():
tasks = []
with ProcessPoolExecutor(max_workers=3) as executor:
for dir_ in ['dir1', 'dir2', 'dir3']:
task = asyncio.create_task(run_dir_handling(executor, dir_))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_dir_handling(executor, dir_):
directory = await loop.run_in_executor(executor, handle, dir_)
await sink.send(value={'dir': directory})
def handle(directory):
print('Handle directory')
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
return directory
P.S。开始了一个问题 https://github.com/robinhood/faust/issues/702
正在开发 Faust-应用程序:
from concurrent.futures import ProcessPoolExecutor, as_completed
import faust
app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')
@app.task()
async def check():
# 3 is amount of different folders where archives are laced
with ProcessPoolExecutor(max_workers=3) as executor:
fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
for future in as_completed(fs):
future.result()
def handle(directory):
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
f = sink.send_soon(value={'ts': 1234567890, 'count': 10}) # always in pending status
当方法 sink.send_soon returns FutureMessage(asyncio.Future, Awaitable[RecordMetadata]) 始终处于挂起状态。
这是future in another future的情况。
注意。函数 handle 应该是同步的,因为不能将异步函数传递给 ProcessPollExecutor。方法send_soon是同步方法。根据这个例子https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47 waiting 不一定。
有没有办法处理未决的未来?
也试过这个:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import faust
loop = asyncio.get_event_loop()
app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')
@app.task()
async def check():
tasks = []
with ProcessPoolExecutor(max_workers=3) as executor:
for dir_ in ['dir1', 'dir2', 'dir3']:
task = asyncio.create_task(run_dir_handling(executor, dir_))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_dir_handling(executor, dir_):
print('running blocking')
await loop.run_in_executor(executor, handle, dir_)
def handle(directory):
print('Handle directory')
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
# `send_soon` is not non-`async def but `send` is async
# async `soon` cannot be implemented because of
# `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
print(f) # always <FutureMessage pending>
但是也没用。
看来循环连运行send_soon方法都没有机会了
为此更改了代码结构:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import faust
loop = asyncio.get_event_loop()
app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic1')
@app.task()
async def check():
tasks = []
with ProcessPoolExecutor(max_workers=3) as executor:
for dir_ in ['dir1', 'dir2', 'dir3']:
task = asyncio.create_task(run_dir_handling(executor, dir_))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_dir_handling(executor, dir_):
directory = await loop.run_in_executor(executor, handle, dir_)
await sink.send(value={'dir': directory})
def handle(directory):
print('Handle directory')
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
return directory