保存并加载 JSON 和 scala 的对象 on-top/with Spark

Save and load JSON and scala's objects on-top/with Spark

我在使用 spark 将文件读写到“远程”文件系统(例如 hadoop)时遇到问题。

内容

  1. 我在当地做了什么?
  2. 我想在 'remote' 做什么?

1。我在当地做了什么?

至于现在,我在本地使用 spark - 在我的设备上读取和写入文件,如下所示:

正在初始化 Spark 会话:

  val spark: SparkSession = Try(
    SparkSession.builder()
      .master("local[*]")
      .appName("app")
      .getOrCreate()) match {
    case Success(session)=>session
    case Failure(exception)=> throw new Exception(s"Failed initializing spark, due to: ${exception.getMessage}")
  }

Save/Write本地,然后Load/Read它:

(Json 文件)

  val content = "{"a": 10, "b": [], "c": {"x": "1", "z": {}}, {"x": "2", "z": {}}}"  // dummy JSON as string
  val fileName = "full_path/sample.json"

  // ... verify directory exists and create it if not ...

  // write sample.json with the content above:
  new PrintWriter(fileName) {
    write(content)
    close()
  }

  // Read & Operate on it:
  val jsonAsBufferedSource = Source.fromFile(fileName)

(Scala 的案例-Class)

  case class Dummy(string: String, i: Int) extends Serializable {}
  val content = Dummy("42 is the best number", 42)       // Dummy instance
  val fileName = "full_path/sample.dummy"               // 'dummy' is the serialized saved-object name.
  
  // ... verify directory exists and create it if not ...

  // Write it:
  val output = new ObjectOutputStream(new FileOutputStream(fileName))
  output.writeObject(content)
  output.close()

  // Read:
  val input = new ObjectInputStream(new FileInputStream(fileName))
  val dummyObject = input.readObject.asInstanceOf[Dummy]
  input.close()

  // Operate:
  dummyObject.i   // 42


2。我想在 'remote' 做什么?

我希望能够 read/write 在 HDFS、S3 或任何其他可用的 'remote' 文件系统上,使用 spark - 正如我在本地所做的那样。

主要是我的问题是:

一般来说 - 我想让自己 locally/remotly 使用与我的应用程序相同的“内部接口”。

感谢阅读!

编辑

我希望我的应用程序能够 save/read 文件到磁盘并在测试和调试时在我的计算机磁盘上运行。我希望在生产时使用远程文件系统 save/read。
是否可以使用相同的 spark 方法?使用什么 spark 配置?

奥伦

不确定我是否理解问题。 Spark 使用 file:// 和 hdfs:// 或 s3a:// 前缀都一样。错的是Source.fromFile和PrintWriter

您需要重写函数以使用正确的 Spark 方法,因为 Spark 旨在 运行 集群 ,而不是孤立于一台机器(参考作为 driver)

// read all JSON files in a folder
val df = spark.read.json("file:///path/to/full_path/")

// write the dataframe to HDFS folder
df.write.format("json").save("hdfs://namenode.fqdn:port/hdfs/path/")

当然,您可以序列化 class,“本地”写入文件(deploy-mode=cluster 时将是“远程”文件),然后上传 那个,但这看起来不像你在这里做的。而不是这样做,你会 parellelize 序列化 object.

Seq

使用 json4s 而不是 ObjectOutputStream 从 case-classes.

获取 JSON