如何在 Python 中同步 rx 管道?
How to synchronize rx pipes in Python?
我使用 RxPy 来处理文件,我想构建管道加载序列
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.from_list(independing_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler),
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("independing frames loaded!"))
withdraw_file = []
for file in filtered_files:
if self._table_name_on_contain(file) == 'mellow':
withdraw_file += [file]
rx.from_list(withdraw_file).pipe(
self._build_apples_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("apples loaded!"))
rx.from_list(depending_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: self._complete_action())
但是我得到了一个我没有预料到的结果:似乎每个管道都是异步运行的,因为我没有表示"stop-points"。我希望第二个和第三个管道仅在第一个管道完成后才开始。如何解决?
您可以使用 multiprocessing.Event
来同步您的管道:
event = multiprocessing.Event()
rx.pipe(...).subscribe(on_completed=event.set)
event.wait()
rx.pipe(...)
rx.pipe(...)
正如上面所说,我使用条件变量强制线程到达障碍点。我有这样的东西,效果很好。
import logging
import threading
import rx
from EventMonitoringETL.tools import logging_config
from rx import operators as ops
import multiprocessing
import rx.scheduler as scheduler
if __name__ == '__main__':
logging_config.InitLogging()
logger = logging.getLogger('c4t_etl')
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = scheduler.ThreadPoolScheduler(thread_count)
event = multiprocessing.Event()
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("AAAAA")
rx.of(11,12,13,14,15,16,17,18,19,110).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("BBBBB")
rx.of(21,22,23,24,25,26,27,28,29,210).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("CCCCC")
rx.of(31,32,33,34,35,36,37,38,39,310).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("DDDDDDD")
我使用 RxPy 来处理文件,我想构建管道加载序列
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.from_list(independing_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler),
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("independing frames loaded!"))
withdraw_file = []
for file in filtered_files:
if self._table_name_on_contain(file) == 'mellow':
withdraw_file += [file]
rx.from_list(withdraw_file).pipe(
self._build_apples_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("apples loaded!"))
rx.from_list(depending_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: self._complete_action())
但是我得到了一个我没有预料到的结果:似乎每个管道都是异步运行的,因为我没有表示"stop-points"。我希望第二个和第三个管道仅在第一个管道完成后才开始。如何解决?
您可以使用 multiprocessing.Event
来同步您的管道:
event = multiprocessing.Event()
rx.pipe(...).subscribe(on_completed=event.set)
event.wait()
rx.pipe(...)
rx.pipe(...)
正如上面所说,我使用条件变量强制线程到达障碍点。我有这样的东西,效果很好。
import logging
import threading
import rx
from EventMonitoringETL.tools import logging_config
from rx import operators as ops
import multiprocessing
import rx.scheduler as scheduler
if __name__ == '__main__':
logging_config.InitLogging()
logger = logging.getLogger('c4t_etl')
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = scheduler.ThreadPoolScheduler(thread_count)
event = multiprocessing.Event()
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("AAAAA")
rx.of(11,12,13,14,15,16,17,18,19,110).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("BBBBB")
rx.of(21,22,23,24,25,26,27,28,29,210).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("CCCCC")
rx.of(31,32,33,34,35,36,37,38,39,310).pipe(
ops.subscribe_on(thread_pool_scheduler)
).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)
event.wait()
event.clear()
print("DDDDDDD")