为 Spark Rows 定义新模式
Defining new schema for Spark Rows
我有一个 DataFrame,其中一列包含 JSON 的字符串。至此,我已经按照JavaRDD.map
方法的要求实现了Function
接口:Function<Row,Row>()
。在此函数中,我正在解析 JSON,并创建一个新行,其附加列来自 JSON 中的值。例如:
原始行:
+------+-----------------------------------+
| id | json |
+------+-----------------------------------+
| 1 | {"id":"abcd", "name":"dmux",...} |
+------------------------------------------+
应用我的函数后:
+------+----------+-----------+
| id | json_id | json_name |
+------+----------+-----------+
| 1 | abcd | dmux |
+-----------------+-----------+
我在尝试从返回的 JavaRDD 创建新的 DataFrame 时遇到了麻烦 运行。现在我有了这些新行,我需要创建一个模式。该架构高度依赖于 JSON 的结构,因此我试图找出一种将架构数据与 Row
对象一起从函数传回的方法。我不能使用 broadcast
变量,因为 SparkContext 没有传递到函数中。
除了在 Function
的调用者中循环遍历一行中的每一列,我还有什么选择?
您可以创建一个 StructType
。这是 Scala
,但它的工作方式相同:
val newSchema = StructType(Array(
StructField("id", LongType, false),
StructField("json_id", StringType, false),
StructField("json_name", StringType, false)
))
val newDf = sqlContext.createDataFrame(rdd, newSchema)
顺便说一句,您需要确保您的 rdd
是 RDD[Row]
类型。
我有一个 DataFrame,其中一列包含 JSON 的字符串。至此,我已经按照JavaRDD.map
方法的要求实现了Function
接口:Function<Row,Row>()
。在此函数中,我正在解析 JSON,并创建一个新行,其附加列来自 JSON 中的值。例如:
原始行:
+------+-----------------------------------+
| id | json |
+------+-----------------------------------+
| 1 | {"id":"abcd", "name":"dmux",...} |
+------------------------------------------+
应用我的函数后:
+------+----------+-----------+
| id | json_id | json_name |
+------+----------+-----------+
| 1 | abcd | dmux |
+-----------------+-----------+
我在尝试从返回的 JavaRDD 创建新的 DataFrame 时遇到了麻烦 运行。现在我有了这些新行,我需要创建一个模式。该架构高度依赖于 JSON 的结构,因此我试图找出一种将架构数据与 Row
对象一起从函数传回的方法。我不能使用 broadcast
变量,因为 SparkContext 没有传递到函数中。
除了在 Function
的调用者中循环遍历一行中的每一列,我还有什么选择?
您可以创建一个 StructType
。这是 Scala
,但它的工作方式相同:
val newSchema = StructType(Array(
StructField("id", LongType, false),
StructField("json_id", StringType, false),
StructField("json_name", StringType, false)
))
val newDf = sqlContext.createDataFrame(rdd, newSchema)
顺便说一句,您需要确保您的 rdd
是 RDD[Row]
类型。