Apache Spark 中的非线性 (DAG) 机器学习管道
Non linear (DAG) ML pipelines in Apache Spark
我设置了一个简单的 Spark-ML 应用程序,其中我有一个独立的转换器管道,可以将列添加到原始数据的数据帧中。由于转换器不查看彼此的输出,我希望我可以 运行 它们在非线性 (DAG) 管道中并行。关于此功能,我只能找到 the Spark ML-Guide:
中的这一段
It is possible to create non-linear Pipelines as long as the data flow
graph forms a Directed Acyclic Graph (DAG). This graph is currently
specified implicitly based on the input and output column names of
each stage (generally specified as parameters). If the Pipeline forms
a DAG, then the stages must be specified in topological order.
我对该段的理解是,如果我为每个转换器设置 inputCol(s)、outputCol 参数,并在创建管道时按拓扑顺序指定阶段,那么引擎将使用该信息来构建执行DAG s.t。一旦输入准备就绪,DAG 的阶段可以 运行。
关于此的一些问题:
- 我的理解对吗?
- 如果我没有为 stages/transformers 之一指定输出列(例如,该阶段仅过滤某些行),会发生什么情况?它是否会假设出于 DAG 创建目的,该阶段正在更改所有列,因此所有后续阶段都应该等待它?
- 同样,如果我没有为其中一个阶段指定 inputCol(s),会发生什么情况?该阶段会等到所有前面的阶段都完成吗?
- 看来我可以指定多个输入列,但只能指定一个输出列。如果转换器向数据框添加两列会发生什么(Spark 本身对此没有问题)?有没有办法让 DAG 创建引擎知道它?
Is my understanding correct?
不完全是。因为阶段是按拓扑顺序提供的,所以要以正确的顺序遍历图形,您只需从左到右应用 PipelineStages
。这正是您调用 PipelineTransform
.
时发生的情况
阶段序列遍历两次:
- 使用
transformSchema
which is simply implemented as stages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur))
验证架构一次。这是执行实际架构验证的部分。
- 一次拟合实际上使用
Transformers
和拟合 Estimators
转换数据。这只是 a simple for loop which applies stages sequentially one by one.
Likewise, what happens if for one of the stages I don't specify an inputCol(s)?
几乎没有什么有趣的。由于阶段是按顺序应用的,并且唯一的模式验证是由给定的 Transformer
在实际转换开始之前使用其 transformSchema
方法应用的,因此它将像任何其他阶段一样处理。
What happens if a transformer adds two columns to a dataframe
同上。只要它为后续阶段生成有效的输入模式,它就与任何其他 Transformer
.
没有什么不同
transformers don't look at the output of one another I was hoping I could run them in parallel
从理论上讲,您可以尝试构建一个自定义复合转换器,它封装了多个不同的转换,但唯一可以独立执行并从此类操作中受益的部分是模型拟合。在一天结束时,您必须 return 一个单一的转换 DataFrame
可以被下游阶段使用,实际的转换很可能被安排为一个单一的数据扫描。
问题仍然存在,是否真的值得付出努力。虽然可以同时执行多个作业,但如果可用资源量与处理单个作业所需的工作量相比相对较高,则它只能提供一些优势。它通常需要一些低级别的管理(分区数,随机分区数),这不是 Spark 的最强项 SQL。
我设置了一个简单的 Spark-ML 应用程序,其中我有一个独立的转换器管道,可以将列添加到原始数据的数据帧中。由于转换器不查看彼此的输出,我希望我可以 运行 它们在非线性 (DAG) 管道中并行。关于此功能,我只能找到 the Spark ML-Guide:
中的这一段It is possible to create non-linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the Pipeline forms a DAG, then the stages must be specified in topological order.
我对该段的理解是,如果我为每个转换器设置 inputCol(s)、outputCol 参数,并在创建管道时按拓扑顺序指定阶段,那么引擎将使用该信息来构建执行DAG s.t。一旦输入准备就绪,DAG 的阶段可以 运行。
关于此的一些问题:
- 我的理解对吗?
- 如果我没有为 stages/transformers 之一指定输出列(例如,该阶段仅过滤某些行),会发生什么情况?它是否会假设出于 DAG 创建目的,该阶段正在更改所有列,因此所有后续阶段都应该等待它?
- 同样,如果我没有为其中一个阶段指定 inputCol(s),会发生什么情况?该阶段会等到所有前面的阶段都完成吗?
- 看来我可以指定多个输入列,但只能指定一个输出列。如果转换器向数据框添加两列会发生什么(Spark 本身对此没有问题)?有没有办法让 DAG 创建引擎知道它?
Is my understanding correct?
不完全是。因为阶段是按拓扑顺序提供的,所以要以正确的顺序遍历图形,您只需从左到右应用 PipelineStages
。这正是您调用 PipelineTransform
.
阶段序列遍历两次:
- 使用
transformSchema
which is simply implemented asstages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur))
验证架构一次。这是执行实际架构验证的部分。 - 一次拟合实际上使用
Transformers
和拟合Estimators
转换数据。这只是 a simple for loop which applies stages sequentially one by one.
Likewise, what happens if for one of the stages I don't specify an inputCol(s)?
几乎没有什么有趣的。由于阶段是按顺序应用的,并且唯一的模式验证是由给定的 Transformer
在实际转换开始之前使用其 transformSchema
方法应用的,因此它将像任何其他阶段一样处理。
What happens if a transformer adds two columns to a dataframe
同上。只要它为后续阶段生成有效的输入模式,它就与任何其他 Transformer
.
transformers don't look at the output of one another I was hoping I could run them in parallel
从理论上讲,您可以尝试构建一个自定义复合转换器,它封装了多个不同的转换,但唯一可以独立执行并从此类操作中受益的部分是模型拟合。在一天结束时,您必须 return 一个单一的转换 DataFrame
可以被下游阶段使用,实际的转换很可能被安排为一个单一的数据扫描。
问题仍然存在,是否真的值得付出努力。虽然可以同时执行多个作业,但如果可用资源量与处理单个作业所需的工作量相比相对较高,则它只能提供一些优势。它通常需要一些低级别的管理(分区数,随机分区数),这不是 Spark 的最强项 SQL。