如何在单个火花作业中摄取不同的火花数据帧

How to ingest different spark dataframes in a single spark job

我想在处理不同输入源但使用尽可能少的计算资源的 spark 中编写 ETL 管道,并且在使用 'traditional' spark ETL 方法时遇到问题。

我有许多流数据源需要保存到 DeltaLake tables 中。每个数据源只是 s3 中带有 avro 文件的文件夹。每个数据源都有不同的模式。每个数据源都应该持久保存到它自己的 DeltaLake table 中。除了 avro -> delta 之外几乎不需要任何转换,只需要使用从文件名派生的一些额外字段进行充实。 新文件以适中的速度添加,从一分钟一次到一天一次,具体取决于数据源。当新数据到达时,我有一个 kafka 通知,描述了什么样的数据和 s3 文件路径。

假设有两个数据源 - A 和 B。A 是 s3://bucket/A/* 文件,B - s3://bucket/B/*。每当添加新文件时,我都会收到一条带有有效负载 {'datasource': 'A', 文件名: 's3://bucket/A/file1', ... 其他字段} 的 kafka 消息。 A 文件应该转到 delta table s3://delta/A/, B - s3://delta/B/

如何在单个 Spark 应用程序中以最小的延迟摄取它们? 随着需要的数据不断涌现,听起来像流媒体。但是在 Spark Streaming 中,需要预先定义流模式,而我有不同的源和不同的模式,事先不知道。

为每个数据源启动一个专用的 spark 应用程序不是一种选择 - 有 100 多个数据源和非常小的文件到达。拥有 100 多个 Spark 应用程序是在浪费金钱。所有这些都应该使用中等大小的单个集群来摄取。

我现在唯一的想法是:在驱动程序进程中 运行 一个普通的 kafka 消费者,为每条记录读取一个数据帧,用额外的字段丰富并坚持它的增量 table。更多并行性 - 消耗多条消息并 运行 它们在未来,因此多个作业 运行 并发。 一些伪代码,在驱动进程中:

val consumer = KafkaConsumer(...)
consumer.foreach{record =>
    val ds = record.datasource
    val file = record.filename
    val df = spark.read.format(avro).load(file)
        .withColumn('id', record.id)
    val dest = s"s3://delta/${record.datasourceName}"
    df.write.format('delta').save(dest)
    consumer.commit(offset from record)
}

听起来不错(而且 PoC 表明它有效),但我想知道是否还有其他选择?任何其他想法表示赞赏。 DataBricks 平台中的 Spark 运行s。

Spark 不限制您在每次数据源摄取时都拥有一个 Spark 应用程序,您可以将数据源分组到几个 Spark 应用程序中,或者您可以为所有数据源使用一个 Spark 应用程序,如果 Spark应用程序有足够的资源来摄取和处理所有数据源。

您可以这样做:

object StreamingJobs extends SparkApp {

  // consume from Kafka Topic 1
  StreamProcess_1.runStream(spark)

  // consume from Kafka Topic 2
  StreamProcess_2.runStream(spark)

  //  consume from Kafka Topic n
  StreamProcess_N.runStream(spark)

  // wait until termination
  spark.streams.awaitAnyTermination()

}

也许还有另一个用于批处理的 spark 作业

object BatchedJobs extends SparkApp {

  // consume from data source 1
  BatchedProcess_1.run(spark)

  // consume from  data source 2
  BatchedProcess_2.run(spark)

  //  consume from  data source n
  BatchedProcess_N.run(spark) 

}