当系列到系列(PandasUDFType.SCALAR)可用时,为什么系列迭代器到系列 pandasUDF(PandasUDFType.SCALAR_ITER)的迭代器?

Why Iterator of Series to Iterator of Series pandasUDF (PandasUDFType.SCALAR_ITER) when Series to Series (PandasUDFType.SCALAR) is available?

根据函数的输入和输出类型,有不同种类的 pandasUDFType。

有:

系列到系列 PandasUDFType.SCALAR:

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

spark.range(10).select(pandas_plus_one("id")).show()

还有系列迭代器到系列迭代器PandasUDFType.SCALAR_ITER:

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
    return map(lambda s: s + 1, iterator)

spark.range(10).select(pandas_plus_one("id")).show()

能否请您给我一个简单的用例,它不能由系列到系列 PandasUDFType.SCALAR 解决,而可以通过系列的迭代器到系列的迭代器 PandasUDFType.SCALAR_ITER 解决。我似乎无法理解在另一个还在的情况下有什么必要

根据官方的 documentation and Databricks docs, these two types of Pandas UDFs are quite similar but differ in some aspects. In addition to input and output type differences, Iterator of Series to Iterator of Series UDF can only take a single column as input whereas Scalar UDF can take multiple input columns. To make Iterator UDF takes multiple spark columns, you'll need to use Iterator of multiple Series to Iterator of Series UDF 这与 Iterator of Series to Iterator of Series UDF 基本相同,但采用 p.Series 元组 的迭代器作为参数.

迭代器 UDF are said useful 当:

  • 您需要预取输入迭代器
@pandas_udf("long") 
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: 
    threading.Thread(consume, args=(iterator, q)) # prefetch the iterator 
    for s in q: 
        yield func(s) 
  • 您需要在处理每个批次之前进行一些昂贵的状态初始化:
@pandas_udf("long") 
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: 
    s = some_initialization() # initialize states
    for x in iterator: 
        yield func(x, s) # use the state for the whole iterator

但是,文档中的引用引起了一些混淆,因为它指出在内部它的工作方式与 Series 到 Series 相同:

It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case