如何在不使用 Spark 的情况下调试 pandas_udfs?

How to debug pandas_udfs without having to use Spark?

如果我在 Palantir Foundry 中使用 Python 转换并且我正在尝试 运行 使用 in-memory/non-spark 库的算法,我希望它自动扩展并在Spark(不是 pandas)。如果我在编写代码时遇到困难并想在本地测试和开发它,但稍后在 pyspark 中使用相同的代码,我该怎么做?

举个具体的例子,我想计算包含多边形的 geojson 列的面积。因为我需要使用一些不是 Spark 原生的库(shapelypyproj)。我知道最好的方法(性能方面)是使用 pandas_udf(也称为流式 udfs 或矢量化 udfs)。但是在阅读了一些指南之后,特别是 Introducing Pandas UDF for PySparkpandas 用户定义函数 Modeling at Scale with Pandas UDFs w/code examples, it's still challenging to debug and get working, and it seems like I can't use break statements and there isn't a first class way to log/print.

实际的数据框会有数百万行(与数百万个多边形相关),但为了简单起见,我想用一个简单的数据框在本地进行测试,稍后它会扩展到更大的数据集:

df = spark.createDataFrame(
    [
        ("AFG", "{\"type\":\"Polygon\",\"coordinates\":[[[61.210817,35.650072],[62.230651,35.270664],[62.984662,35.404041],[63.193538,35.857166],[63.982896,36.007957],[64.546479,36.312073],[64.746105,37.111818],[65.588948,37.305217],[65.745631,37.661164],[66.217385,37.39379],[66.518607,37.362784],[67.075782,37.356144],[67.83,37.144994],[68.135562,37.023115],[68.859446,37.344336],[69.196273,37.151144],[69.518785,37.608997],[70.116578,37.588223],[70.270574,37.735165],[70.376304,38.138396],[70.806821,38.486282],[71.348131,38.258905],[71.239404,37.953265],[71.541918,37.905774],[71.448693,37.065645],[71.844638,36.738171],[72.193041,36.948288],[72.63689,37.047558],[73.260056,37.495257],[73.948696,37.421566],[74.980002,37.41999],[75.158028,37.133031],[74.575893,37.020841],[74.067552,36.836176],[72.920025,36.720007],[71.846292,36.509942],[71.262348,36.074388],[71.498768,35.650563],[71.613076,35.153203],[71.115019,34.733126],[71.156773,34.348911],[70.881803,33.988856],[69.930543,34.02012],[70.323594,33.358533],[69.687147,33.105499],[69.262522,32.501944],[69.317764,31.901412],[68.926677,31.620189],[68.556932,31.71331],[67.792689,31.58293],[67.683394,31.303154],[66.938891,31.304911],[66.381458,30.738899],[66.346473,29.887943],[65.046862,29.472181],[64.350419,29.560031],[64.148002,29.340819],[63.550261,29.468331],[62.549857,29.318572],[60.874248,29.829239],[61.781222,30.73585],[61.699314,31.379506],[60.941945,31.548075],[60.863655,32.18292],[60.536078,32.981269],[60.9637,33.528832],[60.52843,33.676446],[60.803193,34.404102],[61.210817,35.650072]]]}"),  
        ("ALB", "{\"type\":\"Polygon\",\"coordinates\":[[[20.590247,41.855404],[20.463175,41.515089],[20.605182,41.086226],[21.02004,40.842727],[20.99999,40.580004],[20.674997,40.435],[20.615,40.110007],[20.150016,39.624998],[19.98,39.694993],[19.960002,39.915006],[19.406082,40.250773],[19.319059,40.72723],[19.40355,41.409566],[19.540027,41.719986],[19.371769,41.877548],[19.304486,42.195745],[19.738051,42.688247],[19.801613,42.500093],[20.0707,42.58863],[20.283755,42.32026],[20.52295,42.21787],[20.590247,41.855404]]]}"),
    ],# can continue with more countries  from https://raw.githubusercontent.com/johan/world.geo.json/34c96bba9c07d2ceb30696c599bb51a5b939b20f/countries.geo.json
    ["country", "geometry"]
)

鉴于实际上是 geojson 的几何列,我如何使用良好的 GIS 方法计算以平方米为单位的面积?例如,使用这些问题中概述的方法:

Calculate Polygon area in planar units (e.g. square-meters) in Shapely

How do I get the area of a GeoJSON polygon with Python

How to calculate the area of a polygon on the earth's surface using python?

您可以考虑 pandas_udfs 的方式是编写要应用于 pandas 系列的逻辑。这意味着您将应用一个操作,它会自动 apply 到每一行。

如果你想在本地开发这个,你实际上可以取一个小得多的数据样本(就像你做的那样),并将它存储在一个 pandas 系列中,并让它在那里工作:

from shapely.geometry import Polygon
import json
from pyproj import Geod

#just select the column you want to use the pandas udf
pdf = df.select("geometry").toPandas()

#convert to pandas series
pdf_geom_raw = pdf.ix[:,0]

#how to apply converting string to json/dict
pdf_geom = pdf_geom_raw.apply(json.loads)

# function using non-spark functions
def get_area(shape):
    geod = Geod(ellps="WGS84")
    poly = Polygon(shape["coordinates"][0])
    area = abs(geod.geometry_area_perimeter(poly)[0])
    return area
pdf_geom = pdf_geom.apply(get_area)

在这里,您可以通过将 pdf = df.select("geometry").toPandas() 替换为 pdf = pd.read_csv("geo.csv")

来在本地尝试(没有火花)

现在您已经在本地运行了,您可以将代码复制粘贴到 pandas_udf

from shapely.geometry import Polygon
import json
from pyproj import Geod
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.SCALAR)
def geodesic_polygon_area(pdf_geom): 
    pdf_geom = pdf_geom.apply(json.loads)
    def get_area(shape):
        geod = Geod(ellps="WGS84")
        poly = Polygon(shape["coordinates"][0])
        area = abs(geod.geometry_area_perimeter(poly)[0])
        return area

    pdf_geom = pdf_geom.apply(get_area)
    return pdf_geom

df =  df.withColumn('area_square_meters', geodesic_polygon_area(df.geometry))

当运行代码:

>>> df.show()
+-------+--------------------+--------------------+
|country|            geometry|  area_square_meters|
+-------+--------------------+--------------------+
|    AFG|{"type":"Polygon"...|6.522700837770404E11|
|    ALB|{"type":"Polygon"...|2.969479517410540...|
+-------+--------------------+--------------------+