Sparklyr:使用调用方法列出 R 中目录的内容

Sparklyr: List contents of directory in R using invoke methods

无法找到用于通过 Spark 列出目录内容的内置 sparklyr,我正在尝试使用 invoke:

sc <- spark_connect(master = "yarn", config=config)
path <- 'gs:// ***path to bucket on google cloud*** '
spath <- sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', path) 
fs <- sparklyr::invoke(spath, 'getFileSystem')
list <- sparklyr:: invoke(fs, 'listLocatedStatus') 
Error: java.lang.Exception: No matched method found for class org.apache.hadoop.fs.Path.getFileSystem
    at sparklyr.Invoke.invoke(invoke.scala:134)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66) ...

注意:是否有分布式代码可重现示例的指南?考虑到我 运行 针对特定的 Spark 环境,我不知道如何举一个其他人可以效仿的例子。

getFileSystem方法takesorg.apache.hadoop.conf.Configuration对象作为第一个参数:

public FileSystem getFileSystem(Configuration conf)
                     throws IOException

Return the FileSystem that owns this Path.

Parameters:

conf - the configuration to use when resolving the FileSystem

所以检索 FileSystem 实例的代码应该大致如下所示:

# Retrieve Spark's Hadoop configuration
hconf <- sc %>% spark_context() %>% invoke("hadoopConfiguration")
fs <- sparklyr::invoke(spath, 'getFileSystem', hconf)

另外listLocatedStatustakes either Path

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                     throws FileNotFoundException,
                                                                            IOException

Path and PathFilter(注意这个实现是protected):

public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
                                                                    throws FileNotFoundException,
                                                                            IOException

因此,如果您想按照上面所示构建代码,则必须至少提供一个路径

sparklyr:: invoke(fs, "listLocatedStatus", spath)

实际上,直接获取 FileSystem 可能更容易:

fs <- invoke_static(sc, "org.apache.hadoop.fs.FileSystem", "get",  hconf)

并使用globStatus

lls <- invoke(fs, "globStatus", spath)

其中 spath 是带通配符的路径,例如:

sparklyr::invoke_new(sc, 'org.apache.hadoop.fs.Path', "/some/path/*")

结果将是一个 R list,可以轻松迭代:

lls  %>%
    purrr::map(function(x) invoke(x, "getPath") %>% invoke("toString"))

学分

The answer to How can one list all csv files in an HDFS location within the Spark Scala shell? by @jaime

备注:

  • 一般来说,如果您与非平凡的 Java API 交互,用 Java 或 Scala 编写代码并提供最小的 R 接口更有意义.
  • 对于与特定文件对象存储的交互,使用专用包可能更容易。对于 Google 云存储,您可以查看 googleCloudStorageR