Spark mapInPandas 中有多少迭代器?
How many Iterators are there in Spark mapInPandas?
我想了解“mapInPandas”在 Spark 中的工作原理。
Databricks博客上引用的例子是:
from typing import Iterator
import pandas as pd
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(pandas_filter, schema=df.schema).show()
问题是,迭代器中将有多少个“pdf”?
我猜他们可能和分区的数量一样多
但是当我进一步测试代码时,它们似乎太多了(在具有约 100 米记录的不同数据集上)
那么有没有办法知道迭代次数是如何确定的以及
有没有办法让它等于分区数?
您可以在 documentation 中找到:
Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf “spark.sql.execution.arrow.maxRecordsPerBatch” to an integer that will determine the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.
所以如果你有 1000 万条记录,你将有大约 10,000 个迭代器
我想了解“mapInPandas”在 Spark 中的工作原理。 Databricks博客上引用的例子是:
from typing import Iterator
import pandas as pd
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(pandas_filter, schema=df.schema).show()
问题是,迭代器中将有多少个“pdf”? 我猜他们可能和分区的数量一样多 但是当我进一步测试代码时,它们似乎太多了(在具有约 100 米记录的不同数据集上)
那么有没有办法知道迭代次数是如何确定的以及 有没有办法让它等于分区数?
您可以在 documentation 中找到:
Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf “spark.sql.execution.arrow.maxRecordsPerBatch” to an integer that will determine the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.
所以如果你有 1000 万条记录,你将有大约 10,000 个迭代器