在 Jupyter Notebook 中使用 PySpark 从 AWS EMR 集群读取存储在 AWS S3 中的解压缩 Shapefile

Reading Unzipped Shapefiles stored in AWS S3 from AWS EMR Cluster using PySpark in Jupyter Notebook

我对 AWS EMR 和 apache spark 完全陌生。我正在尝试使用 shapefile 将 GeoID 分配给住宅物业。我无法从我的 s3 存储桶中读取 shapefile。请帮助我了解发生了什么,因为我在互联网上找不到任何解释确切问题的答案。

<!-- language: python 3.4 -->

import shapefile
import pandas as pd

def read_shapefile(shp_path):

"""
Read a shapefile into a Pandas dataframe with a 'coords' column holding
the geometry information. This uses the pyshp package
"""
    #read file, parse out the records and shapes
    sf = shapefile.Reader(shp_path)
    fields = [x[0] for x in sf.fields][1:]
    records = sf.records()
    shps = [s.points for s in sf.shapes()]
    center = [shape(s).centroid.coords[0] for s in sf.shapes()]

    #write into a dataframe
    df = pd.DataFrame(columns=fields, data=records)
    df = df.assign(coords=shps, centroid=center)

    return df

read_shapefile("s3a://uim-raw-datasets/census-bureau/tabblock-2010/tabblock-by-fips/tl_2010_01001_tabblock10")

Files That I want to read

The error that I'm getting while reading from the bucket

我真的很想在 AWS EMR 集群中读取这些 shapefile,因为我无法在本地单独处理它们。感谢任何形式的帮助。

一开始我能够从 s3 存储桶中读取我的形状文件作为二进制对象,然后围绕它构建一个包装函数,最后将各个文件对象解析为 .dbf 中的 shapefile.reader() 方法, .shp , .shx 分别格式化。

发生这种情况是因为 PySpark 无法读取 SparkContext 中未提供的格式。发现此 link 有帮助 Using pyshp to read a file-like object from a zipped archive

我的解决方案

def read_shapefile(shp_path):

    import io
    import shapefile

    blocks = sc.binaryFiles(shp_path)
    block_dict = dict(blocks.collect())

    sf = shapefile.Reader(shp=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".shp")][0]]),
                              shx=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".shx")][0]]),
                              dbf=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".dbf")][0]]))

    fields = [x[0] for x in sf.fields][1:]
    records = sf.records()
    shps = [s.points for s in sf.shapes()]
    center = [shape(s).centroid.coords[0] for s in sf.shapes()]

    #write into a dataframe
    df = pd.DataFrame(columns=fields, data=records)
    df = df.assign(coords=shps, centroid=center)

    return df
block_shapes = read_shapefile("s3a://uim-raw-datasets/census-bureau/tabblock-2010/tabblock-by-fips/tl_2010_01001_tabblock10*")

这很好用,没有中断。