如何在地理上过滤 PySpark 中的条目?

How to geographically filter entries in PySpark?

我有一个时间段内 Lat/Lon 格式的用户位置数据集,我想使用 GIS 函数过滤条目。例如,在多边形内查找条目(ST_Contains 来自 GIS 世界)并使用 ESRI geodatabase 文件添加一列,该列是用户条目所在的地区。

我在网上搜索并找到 Magellan, but the Python support is not working at this time. I have also found Hive support for GIS functions in Esri Spatial,但没有找到有关如何在启动 PySpark 时加载正确的包或如何在 PySpark 中注册所需函数的文档 shell : (ST_Polygon, ST_Contains, 等等...).

还有其他我应该考虑的选择吗?我正在使用 Azure 的 HDInsight,所以我可以访问 PySpark shell:

中的 HiveContext 对象
>>> sqlContext
<pyspark.sql.context.HiveContext object at 0x7f3294093b10>

示例数据集:

| Timestamp| User| Latitude|Longitude| |1462838468|49B4361512443A4DA...|39.777982|-7.054599| |1462838512|49B4361512443A4DA...|39.777982|-7.054599| |1462838389|49B4361512443A4DA...|39.777982|-7.054599| |1462838497|49B4361512443A4DA...|39.777982|-7.054599| |1465975885|6E9E0581E2A032FD8...|37.118362|-8.205041| |1457723815|405C238E25FE0B9E7...|37.177322|-7.426781| |1457897289|405C238E25FE0B9E7...|37.177922|-7.447443| |1457899229|405C238E25FE0B9E7...|37.177922|-7.447443| |1457972626|405C238E25FE0B9E7...| 37.18059| -7.46128| |1458062553|405C238E25FE0B9E7...|37.177322|-7.426781| |1458241825|405C238E25FE0B9E7...|37.178172|-7.444512| |1458244457|405C238E25FE0B9E7...|37.178172|-7.444512| |1458412513|405C238E25FE0B9E7...|37.177322|-7.426781| |1458412292|405C238E25FE0B9E7...|37.177322|-7.426781| |1465197963|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465202192|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923817|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923766|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923748|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923922|6E9E0581E2A032FD8...|37.118362|-8.205041|

您可以将任何 python 库与 Spark 一起使用,无需特定于 Spark 的库。一些随机搜索推荐的一些 GIS python 库位于 http://spatialdemography.org/essential-python-geospatial-libraries/

您必须安装要使用的库。可以在此处找到有关如何安装库的说明:

然后,只需使用任何库向您的 RDD 添加一列,如下所示:

from my_gis_library_of_choice import in_polygon, district

text_lines = sc.textFile('wasb:///mydataset')
split = text_lines.map(lambda line: line.split('|'))
with_extra_columns = split.map(lambda r: r.append(in_polygon(r[2], r[3])).append(district(r[2], r[3])))