PySpark Pandas UDF 最佳实践

PySpark Pandas UDF Best Practices

我写了下面的pandas_udf来计算 PySpark 中的半正弦距离:

def haversine(witness_lat : pd.Series, witness_lon: pd.Series, beacon_lat: pd.Series, beacon_lon: pd.Series) -> pd.Series:
    if None in [witness_lat, witness_lon, beacon_lat, beacon_lon]:
        return None
    else:
        lon1 = witness_lon
        lat1 = witness_lat
        lon2 = beacon_lon
        lat2 = beacon_lat

        lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])
        dlon = lon2 - lon1 
        dlat = lat2 - lat1 
        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a)) 
        m = 6367000 * c
        return m

@pandas_udf("float", PandasUDFType.SCALAR)
def udf_calc_distance(st_y_witness, st_x_witness, st_y_transmitter, st_x_transmitter):
    distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
    distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
    return distance_df['distance']

这段代码运行正常,并给出了我期望的答案,但是我收到了如下所示的折旧警告。

UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
  warnings.warn(

我在此处查看了有关数据块的最新 pandas_udf 文档:https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html but I'm not sure how to use the hints with the apply formatting. I set up my code based on other examples I've seen on stack overflow like this one: 遵循将被折旧的格式。

感谢您的帮助!

只需像为 haversine 函数所做的那样添加函数类型:

@pandas_udf("float")
def udf_calc_distance(st_y_witness: pd.Series, st_x_witness: pd.Series, st_y_transmitter: pd.Series, st_x_transmitter: pd.Series) -> pd.Series:
    distance_df = pd.DataFrame({'st_y_witness' : st_y_witness, 'st_x_witness' : st_x_witness, 'st_y_transmitter' : st_y_transmitter, 'st_x_transmitter' : st_x_transmitter})
    distance_df['distance'] = distance_df.apply(lambda x : haversine(x['st_y_witness'], x['st_x_witness'], x['st_y_transmitter'], x['st_x_transmitter']), axis = 1)
    return distance_df['distance']

如果您正在寻找 description/documentation,请参阅 pyspark pandas_udf docs 中的“示例”部分。

From Spark 3.0 with Python 3.6+, Python type hints detect the function types as below:

>>> @pandas_udf(IntegerType())
>>> def slen(s: pd.Series) -> pd.Series:
>>>     return s.str.len()

Prior to Spark 3.0, the pandas UDF used functionType to decide the execution type as below:

>>> from pyspark.sql.functions import PandasUDFType
>>> from pyspark.sql.types import IntegerType
>>> @pandas_udf(IntegerType(), PandasUDFType.SCALAR)
>>> def slen(s):
>>>     return s.str.len()

It is preferred to specify type hints for the pandas UDF instead of specifying pandas UDF type via > functionType which will be deprecated in the future releases.

Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of pyspark.sql.types.StructType.