未来在未来永远悬而未决

Future inside future always pending

P.S。开始了一个问题 https://github.com/robinhood/faust/issues/702

正在开发 Faust-应用程序:

from concurrent.futures import ProcessPoolExecutor, as_completed

import faust

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')


@app.task()
async def check():
    # 3 is amount of different folders where archives are laced
    with ProcessPoolExecutor(max_workers=3) as executor:
        fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
        for future in as_completed(fs):
            future.result()


def handle(directory):
    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10})  # always in pending status

当方法 sink.send_soon returns FutureMessage(asyncio.Future, Awaitable[RecordMetadata]) 始终处于挂起状态。

这是future in another future的情况。

注意。函数 handle 应该是同步的,因为不能将异步函数传递给 ProcessPollExecutor。方法send_soon是同步方法。根据这个例子https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47 waiting 不一定。

有没有办法处理未决的未来?

也试过这个:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')


@app.task()
async def check():
    tasks = []
    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    print('running blocking')
    await loop.run_in_executor(executor, handle, dir_)


def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    # `send_soon` is not non-`async def but `send` is async
    # async `soon` cannot be implemented because of
    #    `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
    print(f)  # always <FutureMessage pending>

但是也没用。

看来循环连运行send_soon方法都没有机会了

为此更改了代码结构:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic1')


@app.task()
async def check():
    tasks = []

    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    directory = await loop.run_in_executor(executor, handle, dir_)
    await sink.send(value={'dir': directory})  
    

def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    return directory