Tree/nested 关系数据模型中的 Spark 结构

Tree/nested structures in Spark from relational data model

如果我理解正确,我可以将 spark 数据集视为 objects 类型 T 的列表。如何以 parent 包含 children 列表的方式连接两个数据集?但是 child 也会有自己的列表 children...

一种方法是根据密钥对 children 进行 groupBy,但 collect_list returns 只有一列,我想是更好的方法。

想要的结果基本上是 Customer 类型的数据集(客户列表 objects?),但添加了:

最终结果会是这样的

case class Customer(customer_id: Int, name: String, address: String, age: Int, invoices: List[Invoices])
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String, items: List[Items])

对于这个结果,我需要来自以下输入:

case class Customer(customer_id: Int, name: String, address: String, age: Int)
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String)
case class InvoiceItem(item_id: Int, invoice_id: Int, num_of_items: Int, price: Double, total: Double)

    val customers_df = Seq(
       (11,"customer1", "address1", 10, "F")
      ,(12,"customer2", "address2", 20, "M")
      ,(13,"customer3", "address3", 30, "F")
    ).toDF("customer_id", "name", "address", "age", "sex")
    val customers_ds = customers_df.as[Customer].as("c")

    customers_ds.show

    val invoices_df = Seq(
       (21,11, "10101/1", 20181105, "manual")
      ,(22,11, "10101/2", 20181105, "manual")
      ,(23,11, "10101/3", 20181105, "manual")
      ,(24,12, "10101/4", 20181105, "generated")
      ,(25,12, "10101/5", 20181105, "pos")
    ).toDF("invoice_id", "customer_id", "invoice_num", "date", "invoice_type")
    val invoices_ds = invoices_df.as[Invoice].as("i")

    invoices_ds.show

    val invoice_items_df = Seq(
       (31, 21, 5, 10.0, 50.0)
      ,(32, 21, 3, 15.0, 45.0)
      ,(33, 22, 6, 11.0, 66.0)
      ,(34, 22, 7, 2.0, 14.0)
      ,(35, 23, 1, 100.0, 100.0)
      ,(36, 24, 4, 4.0, 16.0)
    ).toDF("item_id", "invoice_id", "num_of_items", "price", "total")
    val invoice_items_ds = invoice_items_df.as[InvoiceItem].as("ii")

    invoice_items_ds.show

在表格中它看起来像这样:

+-----------+---------+--------+---+---+
|customer_id|     name| address|age|sex|
+-----------+---------+--------+---+---+
|         11|customer1|address1| 10|  F|
|         12|customer2|address2| 20|  M|
|         13|customer3|address3| 30|  F|
+-----------+---------+--------+---+---+

+----------+-----------+-----------+--------+------------+
|invoice_id|customer_id|invoice_num|    date|invoice_type|
+----------+-----------+-----------+--------+------------+
|        21|         11|    10101/1|20181105|      manual|
|        22|         11|    10101/2|20181105|      manual|
|        23|         11|    10101/3|20181105|      manual|
|        24|         12|    10101/4|20181105|   generated|
|        25|         12|    10101/5|20181105|         pos|
+----------+-----------+-----------+--------+------------+

+-------+----------+------------+-----+-----+
|item_id|invoice_id|num_of_items|price|total|
+-------+----------+------------+-----+-----+
|     31|        21|           5| 10.0| 50.0|
|     32|        21|           3| 15.0| 45.0|
|     33|        22|           6| 11.0| 66.0|
|     34|        22|           7|  2.0| 14.0|
|     35|        23|           1|100.0|100.0|
|     36|        24|           4|  4.0| 16.0|
+-------+----------+------------+-----+-----+

您似乎正在尝试将规范化数据读入 Scala 对象树中。您当然可以使用 Spark 做到这一点,但 Spark 可能不是最佳工具。如果数据足够小以适合内存,我认为你的问题是正确的,对象关系映射 (ORM) 库可能更适合这项工作。

如果您仍想使用 Spark,那么 groupBycollect_list 是您的正确选择。您缺少的是 struct() 函数。

case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)

val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))

case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])

customers
  .join(
    invoices
      .groupBy('customer_id)
      .agg(collect_list(struct('*)).as("invoices"))
      .withColumnRenamed("customer_id", "id"), 
    Seq("id"), "left_outer")
  .as[CombinedCustomer]
  .show

struct('*) 从整行构建一个 StructType 列。您还可以选择任何列,例如 struct('x.as("colA"), 'colB).

这会产生

+---+----------------+
| id|        invoices|
+---+----------------+
|  1|[[1, 1], [2, 1]]|
+---+----------------+

现在,如果客户数据不适合内存,即使用简单的 collect 不是一种选择,您可以采取多种不同的策略。

最简单的,也是您应该考虑的而不是收集到驱动程序的方法,要求独立处理每个客户的数据是可以接受的。在这种情况下,请尝试使用 map 并将每个客户的处理逻辑分发给工作人员。

如果不能接受客户的独立处理,一般策略如下:

  1. 使用上述方法根据需要将数据聚合到结构化行中。

  2. 对数据重新分区以确保处理所需的所有内容都在一个分区中。

  3. (可选)sortWithinPartitions 以确保分区内的数据按您的需要排序。

  4. 使用mapPartitions.

您可以使用 Spark-SQL 并为客户、发票和项目各有一个数据集。 然后您可以简单地在这些数据集之间使用连接和聚合函数来获得所需的输出。

Spark SQL 在 sql 风格和编程方式之间的性能差异可以忽略不计。