我应该在 Airflow 中使用 Python 原生多线程还是多任务?

Should I use Python native multithread or Multiple Tasks in Airflow?

我正在将 .NET 应用程序重构为气流。这个 .NET 应用程序使用多线程从 mongoDB 中提取和处理数据(如果没有多线程,该过程需要大约 10 小时,使用多线程我可以减少这个时间)。

在 mongoDB 上的每个文档中,我都有一个名为 process 的键值。该值用于控制哪个线程处理文档。我将开发一个 Airflow DAG 来优化这个过程。我的疑问是关于性能和执行此操作的最佳方法。

我的应用程序应该有多个任务(我会在python方法的输入中控制process变量)。或者我应该只使用 1 个任务并在这个任务中使用 Python 多线程?下图说明了我的疑问。

Multi Task X Single Task (Multi Threading)

我知道使用 MultiTask 我将进行更多的数据库读取(每个任务 1 次)。虽然,使用 Python 多线程我知道我必须在 de task 方法中进行大量控制处理。最好、最快和优化的方法是什么?

这实际上取决于您处理的性质。

Python 中的多线程可能会因为 GIL(全局解释器锁)而受到限制 - 有些操作需要独占锁,这会限制它可以实现的并行度。特别是如果您混合使用 CPU 和 I/O 操作,结果可能是线程等待锁花费了大量时间。但这真的取决于你做什么——你需要试验看看 GIL 是否影响你的多线程。

多处理(Airflow 将其用于本地执行器)更好,因为每个进程 运行 实际上是一个单独的 Python 解释器。所以每个进程都有自己的 GIL——以使用的资源为代价(每个进程使用自己的内存、套接字等)。 Airlfow 中的每个任务都将 运行 在一个单独的进程中。

然而,Airflow 提供的功能更多一些——它还提供多机功能。您可以 运行 将工作人员与 Y 机器上的 X 个进程分开,有效地 运行 一次最多连接 X*Y 个进程。

不幸的是,Airflow(目前)不太适合 运行 动态数量的相同类型的并行任务。具体来说,如果您想将负载拆分为 N 个部分,并且 运行 每个部分都在一个单独的任务中 - 这只有在 N 恒定并且对于同一个 DAG 不随时间变化的情况下才真正有效(就像你知道你有 10 台机器,有 4 CPUs,你通常希望一次 运行 10*4 = 40 个任务,所以你必须将你的工作分成 40 个任务。而且它不能改变真的在 运行 之间动态 - 每次 运行 都必须将 DAG 写入 运行 40 个并行任务。

不确定我是否提供了帮助,但没有单一的“最佳优化”答案 - 您需要进行试验并检查最适合您的情况。