如何在 Python 中同步 rx 管道?

How to synchronize rx pipes in Python?

我使用 RxPy 来处理文件,我想构建管道加载序列

pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())

    rx.from_list(independing_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler),
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("independing frames loaded!"))

    withdraw_file = []
    for file in filtered_files:
        if self._table_name_on_contain(file) == 'mellow':
            withdraw_file += [file]

    rx.from_list(withdraw_file).pipe(
        self._build_apples_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("apples loaded!"))

    rx.from_list(depending_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: self._complete_action())

但是我得到了一个我没有预料到的结果:似乎每个管道都是异步运行的,因为我没有表示"stop-points"。我希望第二个和第三个管道仅在第一个管道完成后才开始。如何解决?

您可以使用 multiprocessing.Event 来同步您的管道:

event = multiprocessing.Event()

rx.pipe(...).subscribe(on_completed=event.set)

event.wait()

rx.pipe(...)
rx.pipe(...)

正如上面所说,我使用条件变量强制线程到达障碍点。我有这样的东西,效果很好。

import logging
import threading

import rx
from EventMonitoringETL.tools import logging_config
from rx import operators as ops
import multiprocessing
import rx.scheduler as scheduler
if __name__ == '__main__':
    logging_config.InitLogging()
    logger = logging.getLogger('c4t_etl')

    thread_count = multiprocessing.cpu_count()
    thread_pool_scheduler = scheduler.ThreadPoolScheduler(thread_count)

    event = multiprocessing.Event()

    rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("AAAAA")

    rx.of(11,12,13,14,15,16,17,18,19,110).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("BBBBB")

    rx.of(21,22,23,24,25,26,27,28,29,210).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("CCCCC")

    rx.of(31,32,33,34,35,36,37,38,39,310).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("DDDDDDD")