Json 在推荐引擎的 Scala 中的 Dataframe 字段中
Json in a Dataframe field in Scala in a recommender engine
我正在尝试构建一个基于销售自家产品的网上商店的推荐引擎。
对于示例,我将保持这个简单。使用 scala 和 Spark。
我有一个包含 4 个字段的数据框。
1. A unique ID (INT)
2. A ProductName (String)
3. A ProductPrice (Number)
4. ProductCategories (Json field)
我还有第二个数据框,其中包含这些产品的销售情况。
不管怎样,我在看一个电影推荐引擎的例子,它相当简单。使用电影镜头数据集。我希望将其转换为此处的产品示例。
但是,每个产品的 ProductCategories 可能如下所示:
例如
[{'id': 28, 'type': 'Home'}, {'id': 18, 'type': 'Kitchen'}, {'id': 53, 'type': 'Living'}]
[{'id': 28, 'type': 'Home'}, {'id': 23, 'type': 'Bathroom'}]
当我将数据加载到数据框中时,它是一个 json 字符串。
这是一些数据框代码:
val ProductsDF2 = ProductsDF1_temp1.select(ProductsDF1_temp1.col("id"), ProductsDF1_temp1.col("product_name"), ProductsDF1_temp1.col("product_price"), ProductsDF1_temp1.col("product_categories"))
如何操作代码,以便数据框将类别从 product_categories (json) 中提取到它们自己的列中,如下所示:
|编号 |产品名称 |产品价格 |首页(布尔值) |厨房(布尔)|生活(布尔)| ...等等
我认为这是我需要实现的目标,才能让推荐引擎正常工作。
我相信有点像一种热门编码。
如有任何建议,我们将不胜感激。我对此有点陌生。
谢谢
缺点
希望你觉得这有用
// Let us assume that the dataframe is defined as a variable df
// we need to parse the string json array data in column_name column which is available as a json string
df.show
// output of df
+----------------------------------------------------+
|column_name |
+----------------------------------------------------+
|[{"id":28,"type":"Home"},{"id":18,"type":"Kitchen"}]|
+----------------------------------------------------+
df.printSchema()
root
|-- column_name: string (nullable = true)
// Following is my answer
import org.apache.spark.sql.functions._
import org.json.JSONArray
// This udf converts the json array to an array of json string
val toArray = udf { (data: String) => {
val jsonArray = new JSONArray(data)
var arr: Array[String] = Array()
val objects = (0 until jsonArray.length).map(x => jsonArray.getJSONObject(x))
objects.foreach { elem =>
arr :+= elem.toString
}
arr
}
}
val df1 = df.withColumn("column_name", toArray(col("column_name")))
df1.printSchema()
root
|-- column_name: array (nullable = true)
| |-- element: string (containsNull = true)
val df2 = df1.withColumn("column_name", explode(col("column_name")))
// here we are extracting the data from the json string using the schema of the json string data
val schema = spark.read.json(df1.select("column_name").rdd.map(x => x(0).toString)).schema
df2.withColumn("column_name", from_json(col("column_name"), schema))
.select(col("column_name.*"))
.show(false)
// Final output
+---+-------+
|id |type |
+---+-------+
|28 |Home |
|18 |Kitchen|
+---+-------+
编辑:
请包含以下用于导入的 Maven 依赖项 org.json.JSONArray
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20201115</version>
</dependency>
我正在尝试构建一个基于销售自家产品的网上商店的推荐引擎。 对于示例,我将保持这个简单。使用 scala 和 Spark。
我有一个包含 4 个字段的数据框。
1. A unique ID (INT)
2. A ProductName (String)
3. A ProductPrice (Number)
4. ProductCategories (Json field)
我还有第二个数据框,其中包含这些产品的销售情况。 不管怎样,我在看一个电影推荐引擎的例子,它相当简单。使用电影镜头数据集。我希望将其转换为此处的产品示例。 但是,每个产品的 ProductCategories 可能如下所示:
例如
[{'id': 28, 'type': 'Home'}, {'id': 18, 'type': 'Kitchen'}, {'id': 53, 'type': 'Living'}]
[{'id': 28, 'type': 'Home'}, {'id': 23, 'type': 'Bathroom'}]
当我将数据加载到数据框中时,它是一个 json 字符串。
这是一些数据框代码:
val ProductsDF2 = ProductsDF1_temp1.select(ProductsDF1_temp1.col("id"), ProductsDF1_temp1.col("product_name"), ProductsDF1_temp1.col("product_price"), ProductsDF1_temp1.col("product_categories"))
如何操作代码,以便数据框将类别从 product_categories (json) 中提取到它们自己的列中,如下所示:
|编号 |产品名称 |产品价格 |首页(布尔值) |厨房(布尔)|生活(布尔)| ...等等
我认为这是我需要实现的目标,才能让推荐引擎正常工作。 我相信有点像一种热门编码。
如有任何建议,我们将不胜感激。我对此有点陌生。
谢谢 缺点
希望你觉得这有用
// Let us assume that the dataframe is defined as a variable df
// we need to parse the string json array data in column_name column which is available as a json string
df.show
// output of df
+----------------------------------------------------+
|column_name |
+----------------------------------------------------+
|[{"id":28,"type":"Home"},{"id":18,"type":"Kitchen"}]|
+----------------------------------------------------+
df.printSchema()
root
|-- column_name: string (nullable = true)
// Following is my answer
import org.apache.spark.sql.functions._
import org.json.JSONArray
// This udf converts the json array to an array of json string
val toArray = udf { (data: String) => {
val jsonArray = new JSONArray(data)
var arr: Array[String] = Array()
val objects = (0 until jsonArray.length).map(x => jsonArray.getJSONObject(x))
objects.foreach { elem =>
arr :+= elem.toString
}
arr
}
}
val df1 = df.withColumn("column_name", toArray(col("column_name")))
df1.printSchema()
root
|-- column_name: array (nullable = true)
| |-- element: string (containsNull = true)
val df2 = df1.withColumn("column_name", explode(col("column_name")))
// here we are extracting the data from the json string using the schema of the json string data
val schema = spark.read.json(df1.select("column_name").rdd.map(x => x(0).toString)).schema
df2.withColumn("column_name", from_json(col("column_name"), schema))
.select(col("column_name.*"))
.show(false)
// Final output
+---+-------+
|id |type |
+---+-------+
|28 |Home |
|18 |Kitchen|
+---+-------+
编辑:
请包含以下用于导入的 Maven 依赖项 org.json.JSONArray
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20201115</version>
</dependency>