如何在不使用 Spark 的情况下调试 pandas_udfs?
How to debug pandas_udfs without having to use Spark?
如果我在 Palantir Foundry 中使用 Python 转换并且我正在尝试 运行 使用 in-memory/non-spark 库的算法,我希望它自动扩展并在Spark(不是 pandas)。如果我在编写代码时遇到困难并想在本地测试和开发它,但稍后在 pyspark 中使用相同的代码,我该怎么做?
举个具体的例子,我想计算包含多边形的 geojson 列的面积。因为我需要使用一些不是 Spark 原生的库(shapely
和 pyproj
)。我知道最好的方法(性能方面)是使用 pandas_udf(也称为流式 udfs 或矢量化 udfs)。但是在阅读了一些指南之后,特别是 Introducing Pandas UDF for PySpark、pandas 用户定义函数
和 Modeling at Scale with Pandas UDFs w/code examples, it's still challenging to debug and get working, and it seems like I can't use break statements and there isn't a first class way to log/print.
实际的数据框会有数百万行(与数百万个多边形相关),但为了简单起见,我想用一个简单的数据框在本地进行测试,稍后它会扩展到更大的数据集:
df = spark.createDataFrame(
[
("AFG", "{\"type\":\"Polygon\",\"coordinates\":[[[61.210817,35.650072],[62.230651,35.270664],[62.984662,35.404041],[63.193538,35.857166],[63.982896,36.007957],[64.546479,36.312073],[64.746105,37.111818],[65.588948,37.305217],[65.745631,37.661164],[66.217385,37.39379],[66.518607,37.362784],[67.075782,37.356144],[67.83,37.144994],[68.135562,37.023115],[68.859446,37.344336],[69.196273,37.151144],[69.518785,37.608997],[70.116578,37.588223],[70.270574,37.735165],[70.376304,38.138396],[70.806821,38.486282],[71.348131,38.258905],[71.239404,37.953265],[71.541918,37.905774],[71.448693,37.065645],[71.844638,36.738171],[72.193041,36.948288],[72.63689,37.047558],[73.260056,37.495257],[73.948696,37.421566],[74.980002,37.41999],[75.158028,37.133031],[74.575893,37.020841],[74.067552,36.836176],[72.920025,36.720007],[71.846292,36.509942],[71.262348,36.074388],[71.498768,35.650563],[71.613076,35.153203],[71.115019,34.733126],[71.156773,34.348911],[70.881803,33.988856],[69.930543,34.02012],[70.323594,33.358533],[69.687147,33.105499],[69.262522,32.501944],[69.317764,31.901412],[68.926677,31.620189],[68.556932,31.71331],[67.792689,31.58293],[67.683394,31.303154],[66.938891,31.304911],[66.381458,30.738899],[66.346473,29.887943],[65.046862,29.472181],[64.350419,29.560031],[64.148002,29.340819],[63.550261,29.468331],[62.549857,29.318572],[60.874248,29.829239],[61.781222,30.73585],[61.699314,31.379506],[60.941945,31.548075],[60.863655,32.18292],[60.536078,32.981269],[60.9637,33.528832],[60.52843,33.676446],[60.803193,34.404102],[61.210817,35.650072]]]}"),
("ALB", "{\"type\":\"Polygon\",\"coordinates\":[[[20.590247,41.855404],[20.463175,41.515089],[20.605182,41.086226],[21.02004,40.842727],[20.99999,40.580004],[20.674997,40.435],[20.615,40.110007],[20.150016,39.624998],[19.98,39.694993],[19.960002,39.915006],[19.406082,40.250773],[19.319059,40.72723],[19.40355,41.409566],[19.540027,41.719986],[19.371769,41.877548],[19.304486,42.195745],[19.738051,42.688247],[19.801613,42.500093],[20.0707,42.58863],[20.283755,42.32026],[20.52295,42.21787],[20.590247,41.855404]]]}"),
],# can continue with more countries from https://raw.githubusercontent.com/johan/world.geo.json/34c96bba9c07d2ceb30696c599bb51a5b939b20f/countries.geo.json
["country", "geometry"]
)
鉴于实际上是 geojson 的几何列,我如何使用良好的 GIS 方法计算以平方米为单位的面积?例如,使用这些问题中概述的方法:
Calculate Polygon area in planar units (e.g. square-meters) in Shapely
How do I get the area of a GeoJSON polygon with Python
How to calculate the area of a polygon on the earth's surface using python?
您可以考虑 pandas_udfs 的方式是编写要应用于 pandas 系列的逻辑。这意味着您将应用一个操作,它会自动 apply 到每一行。
如果你想在本地开发这个,你实际上可以取一个小得多的数据样本(就像你做的那样),并将它存储在一个 pandas 系列中,并让它在那里工作:
from shapely.geometry import Polygon
import json
from pyproj import Geod
#just select the column you want to use the pandas udf
pdf = df.select("geometry").toPandas()
#convert to pandas series
pdf_geom_raw = pdf.ix[:,0]
#how to apply converting string to json/dict
pdf_geom = pdf_geom_raw.apply(json.loads)
# function using non-spark functions
def get_area(shape):
geod = Geod(ellps="WGS84")
poly = Polygon(shape["coordinates"][0])
area = abs(geod.geometry_area_perimeter(poly)[0])
return area
pdf_geom = pdf_geom.apply(get_area)
在这里,您可以通过将 pdf = df.select("geometry").toPandas()
替换为 pdf = pd.read_csv("geo.csv")
来在本地尝试(没有火花)
现在您已经在本地运行了,您可以将代码复制粘贴到 pandas_udf
from shapely.geometry import Polygon
import json
from pyproj import Geod
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def geodesic_polygon_area(pdf_geom):
pdf_geom = pdf_geom.apply(json.loads)
def get_area(shape):
geod = Geod(ellps="WGS84")
poly = Polygon(shape["coordinates"][0])
area = abs(geod.geometry_area_perimeter(poly)[0])
return area
pdf_geom = pdf_geom.apply(get_area)
return pdf_geom
df = df.withColumn('area_square_meters', geodesic_polygon_area(df.geometry))
当运行代码:
>>> df.show()
+-------+--------------------+--------------------+
|country| geometry| area_square_meters|
+-------+--------------------+--------------------+
| AFG|{"type":"Polygon"...|6.522700837770404E11|
| ALB|{"type":"Polygon"...|2.969479517410540...|
+-------+--------------------+--------------------+
如果我在 Palantir Foundry 中使用 Python 转换并且我正在尝试 运行 使用 in-memory/non-spark 库的算法,我希望它自动扩展并在Spark(不是 pandas)。如果我在编写代码时遇到困难并想在本地测试和开发它,但稍后在 pyspark 中使用相同的代码,我该怎么做?
举个具体的例子,我想计算包含多边形的 geojson 列的面积。因为我需要使用一些不是 Spark 原生的库(shapely
和 pyproj
)。我知道最好的方法(性能方面)是使用 pandas_udf(也称为流式 udfs 或矢量化 udfs)。但是在阅读了一些指南之后,特别是 Introducing Pandas UDF for PySpark、pandas 用户定义函数
和 Modeling at Scale with Pandas UDFs w/code examples, it's still challenging to debug and get working, and it seems like I can't use break statements and there isn't a first class way to log/print.
实际的数据框会有数百万行(与数百万个多边形相关),但为了简单起见,我想用一个简单的数据框在本地进行测试,稍后它会扩展到更大的数据集:
df = spark.createDataFrame(
[
("AFG", "{\"type\":\"Polygon\",\"coordinates\":[[[61.210817,35.650072],[62.230651,35.270664],[62.984662,35.404041],[63.193538,35.857166],[63.982896,36.007957],[64.546479,36.312073],[64.746105,37.111818],[65.588948,37.305217],[65.745631,37.661164],[66.217385,37.39379],[66.518607,37.362784],[67.075782,37.356144],[67.83,37.144994],[68.135562,37.023115],[68.859446,37.344336],[69.196273,37.151144],[69.518785,37.608997],[70.116578,37.588223],[70.270574,37.735165],[70.376304,38.138396],[70.806821,38.486282],[71.348131,38.258905],[71.239404,37.953265],[71.541918,37.905774],[71.448693,37.065645],[71.844638,36.738171],[72.193041,36.948288],[72.63689,37.047558],[73.260056,37.495257],[73.948696,37.421566],[74.980002,37.41999],[75.158028,37.133031],[74.575893,37.020841],[74.067552,36.836176],[72.920025,36.720007],[71.846292,36.509942],[71.262348,36.074388],[71.498768,35.650563],[71.613076,35.153203],[71.115019,34.733126],[71.156773,34.348911],[70.881803,33.988856],[69.930543,34.02012],[70.323594,33.358533],[69.687147,33.105499],[69.262522,32.501944],[69.317764,31.901412],[68.926677,31.620189],[68.556932,31.71331],[67.792689,31.58293],[67.683394,31.303154],[66.938891,31.304911],[66.381458,30.738899],[66.346473,29.887943],[65.046862,29.472181],[64.350419,29.560031],[64.148002,29.340819],[63.550261,29.468331],[62.549857,29.318572],[60.874248,29.829239],[61.781222,30.73585],[61.699314,31.379506],[60.941945,31.548075],[60.863655,32.18292],[60.536078,32.981269],[60.9637,33.528832],[60.52843,33.676446],[60.803193,34.404102],[61.210817,35.650072]]]}"),
("ALB", "{\"type\":\"Polygon\",\"coordinates\":[[[20.590247,41.855404],[20.463175,41.515089],[20.605182,41.086226],[21.02004,40.842727],[20.99999,40.580004],[20.674997,40.435],[20.615,40.110007],[20.150016,39.624998],[19.98,39.694993],[19.960002,39.915006],[19.406082,40.250773],[19.319059,40.72723],[19.40355,41.409566],[19.540027,41.719986],[19.371769,41.877548],[19.304486,42.195745],[19.738051,42.688247],[19.801613,42.500093],[20.0707,42.58863],[20.283755,42.32026],[20.52295,42.21787],[20.590247,41.855404]]]}"),
],# can continue with more countries from https://raw.githubusercontent.com/johan/world.geo.json/34c96bba9c07d2ceb30696c599bb51a5b939b20f/countries.geo.json
["country", "geometry"]
)
鉴于实际上是 geojson 的几何列,我如何使用良好的 GIS 方法计算以平方米为单位的面积?例如,使用这些问题中概述的方法:
Calculate Polygon area in planar units (e.g. square-meters) in Shapely
How do I get the area of a GeoJSON polygon with Python
How to calculate the area of a polygon on the earth's surface using python?
您可以考虑 pandas_udfs 的方式是编写要应用于 pandas 系列的逻辑。这意味着您将应用一个操作,它会自动 apply 到每一行。
如果你想在本地开发这个,你实际上可以取一个小得多的数据样本(就像你做的那样),并将它存储在一个 pandas 系列中,并让它在那里工作:
from shapely.geometry import Polygon
import json
from pyproj import Geod
#just select the column you want to use the pandas udf
pdf = df.select("geometry").toPandas()
#convert to pandas series
pdf_geom_raw = pdf.ix[:,0]
#how to apply converting string to json/dict
pdf_geom = pdf_geom_raw.apply(json.loads)
# function using non-spark functions
def get_area(shape):
geod = Geod(ellps="WGS84")
poly = Polygon(shape["coordinates"][0])
area = abs(geod.geometry_area_perimeter(poly)[0])
return area
pdf_geom = pdf_geom.apply(get_area)
在这里,您可以通过将 pdf = df.select("geometry").toPandas()
替换为 pdf = pd.read_csv("geo.csv")
现在您已经在本地运行了,您可以将代码复制粘贴到 pandas_udf
from shapely.geometry import Polygon
import json
from pyproj import Geod
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def geodesic_polygon_area(pdf_geom):
pdf_geom = pdf_geom.apply(json.loads)
def get_area(shape):
geod = Geod(ellps="WGS84")
poly = Polygon(shape["coordinates"][0])
area = abs(geod.geometry_area_perimeter(poly)[0])
return area
pdf_geom = pdf_geom.apply(get_area)
return pdf_geom
df = df.withColumn('area_square_meters', geodesic_polygon_area(df.geometry))
当运行代码:
>>> df.show()
+-------+--------------------+--------------------+
|country| geometry| area_square_meters|
+-------+--------------------+--------------------+
| AFG|{"type":"Polygon"...|6.522700837770404E11|
| ALB|{"type":"Polygon"...|2.969479517410540...|
+-------+--------------------+--------------------+