为 Spark Rows 定义新模式

Defining new schema for Spark Rows

我有一个 DataFrame,其中一列包含 JSON 的字符串。至此,我已经按照JavaRDD.map方法的要求实现了Function接口:Function<Row,Row>()。在此函数中,我正在解析 JSON,并创建一个新行,其附加列来自 JSON 中的值。例如:

原始行:

+------+-----------------------------------+
|  id  |        json                       |
+------+-----------------------------------+
|  1   | {"id":"abcd", "name":"dmux",...}  |
+------------------------------------------+

应用我的函数后:

+------+----------+-----------+
|  id  | json_id  | json_name |
+------+----------+-----------+
|  1   | abcd     | dmux      |
+-----------------+-----------+

我在尝试从返回的 JavaRDD 创建新的 DataFrame 时遇到了麻烦 运行。现在我有了这些新行,我需要创建一个模式。该架构高度依赖于 JSON 的结构,因此我试图找出一种将架构数据与 Row 对象一起从函数传回的方法。我不能使用 broadcast 变量,因为 SparkContext 没有传递到函数中。

除了在 Function 的调用者中循环遍历一行中的每一列,我还有什么选择?

您可以创建一个 StructType。这是 Scala,但它的工作方式相同:

val newSchema = StructType(Array(
  StructField("id", LongType, false),
  StructField("json_id", StringType, false),
  StructField("json_name", StringType, false)
))

val newDf = sqlContext.createDataFrame(rdd, newSchema)

顺便说一句,您需要确保您的 rddRDD[Row] 类型。