如何将 Spark 数据帧转换为 Pandas 并返回 Kedro?
How to convert Spark data frame to Pandas and back in Kedro?
我试图了解在 Kedro 中将来自一个节点的 Spark 数据帧转换为另一个节点所需的 Pandas 的最佳方式是什么,而不创建冗余转换步骤。
Kedro 目前支持 2 种策略:
使用 Transcoding 功能
这需要为同一个数据集定义两个 DataCatalog
条目,在您的 catalog.yml
:
my_dataframe@spark:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: data/02_intermediate/data.parquet
my_dataframe@pandas:
type: ParquetLocalDataSet
filepath: data/02_intermediate/data.parquet
然后像这样在管道中使用它们:
Pipeline([
node(my_func1, "spark_input", "my_dataframe@spark"),
node(my_func2, "my_dataframe@pandas", "output"),
])
在这种情况下,kedro
理解 my_dataframe
在这两种情况下是相同的数据集,并正确解析节点执行顺序。同时,kedro
会使用 SparkDataSet
实现来保存,ParquetLocalDataSet
来加载,所以第一个节点应该输出 pyspark.sql.DataFrame
,而第二个节点会收到一个 pandas.Dataframe
.
使用 Pandas to Spark and Spark to Pandas 个节点装饰器
注意: Spark <-> Pandas
内存中转换是 因为它的内存需求,所以只有当数据帧已知时,这是一个可行的选择要小。
可以按照文档装饰节点:
from spark import get_spark
from kedro.contrib.decorators import pandas_to_spark
@pandas_to_spark(spark_session)
def my_func3(data):
data.show() # data is pyspark.sql.DataFrame
甚至整个管道:
Pipeline([
node(my_func4, "pandas_input", "some_output"),
...
]).decorate(pandas_to_spark)
我试图了解在 Kedro 中将来自一个节点的 Spark 数据帧转换为另一个节点所需的 Pandas 的最佳方式是什么,而不创建冗余转换步骤。
Kedro 目前支持 2 种策略:
使用 Transcoding 功能
这需要为同一个数据集定义两个 DataCatalog
条目,在您的 catalog.yml
:
my_dataframe@spark:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: data/02_intermediate/data.parquet
my_dataframe@pandas:
type: ParquetLocalDataSet
filepath: data/02_intermediate/data.parquet
然后像这样在管道中使用它们:
Pipeline([
node(my_func1, "spark_input", "my_dataframe@spark"),
node(my_func2, "my_dataframe@pandas", "output"),
])
在这种情况下,kedro
理解 my_dataframe
在这两种情况下是相同的数据集,并正确解析节点执行顺序。同时,kedro
会使用 SparkDataSet
实现来保存,ParquetLocalDataSet
来加载,所以第一个节点应该输出 pyspark.sql.DataFrame
,而第二个节点会收到一个 pandas.Dataframe
.
使用 Pandas to Spark and Spark to Pandas 个节点装饰器
注意: Spark <-> Pandas
内存中转换是
可以按照文档装饰节点:
from spark import get_spark
from kedro.contrib.decorators import pandas_to_spark
@pandas_to_spark(spark_session)
def my_func3(data):
data.show() # data is pyspark.sql.DataFrame
甚至整个管道:
Pipeline([
node(my_func4, "pandas_input", "some_output"),
...
]).decorate(pandas_to_spark)