如何基于旧的 DataFrame 创建新的 DataFrame?
How do I create a new DataFame based on an old DataFrame?
我有 csv 文件:dbname1.table1.csv:
|target | source |source_table |relation_type|
---------------------------------------------------------------------------------------
avg_ensure_sum_12m | inn_num | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | protocol_dttm | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | inn_num | custom_cib_ml_stg.p_overall_part_tend_cust | indirect
此 table 的 csv 格式:
target,source,source_table,relation_type
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,protocol_dttm,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,indirect
然后我通过读取它来创建一个数据框:
val dfDL = spark.read.option("delimiter", ",")
.option("header", true)
.csv(file.getPath.toUri.getPath)
现在我需要创建一个基于 dfDL 的新数据框。
新数据框的结构如下所示:
case class DataLink(schema_from: String,
table_from: String,
column_from: String,
link_type: String,
schema_to: String,
table_to: String,
column_to: String)
新DataFrame的字段信息是从一个csv文件中获取的:
pseudocode:
schema_from = source_table.split(".")(0) // Example: custom_cib_ml_stg
table_from = source_table.split(".")(1) // Example: p_overall_part_tend_cust
column_from = source // Example: inn_num
link_type = relation_type // Example: direct
schema_to = "dbname1.table1.csv".split(".")(0) // Example: dbname1
table_to = "dbname1.table1.csv".split(".")(1) // Example: table1
column_to = target // Example: avg_ensure_sum_12m
我需要创建一个新的数据框。我一个人应付不了。
P.S。我需要这个数据框以便稍后从中创建一个 json 文件。
例子 JSON:
[{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"inn_num",
"link_type":"direct",
"schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"
},
{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"protocol_dttm",
"link_type":"direct","schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"}
我不喜欢我当前的实现:
def readDLFromHDFS(file: LocatedFileStatus): Array[DataLink] = {
val arrTableName = file.getPath.getName.split("\.")
val (schemaTo, tableTo) = (arrTableName(0), arrTableName(1))
val dfDL = spark.read.option("delimiter", ",")
.option("header", true)
.csv(file.getPath.toUri.getPath)
//val sourceTable = dfDL.select("source_table").collect().map(value => value.toString().split("."))
dfDL.collect.map(row => DataLink(row.getString(2).split("\.")(0),
row.getString(2).split("\.")(1),
row.getString(1),
row.getString(3),
schemaTo,
tableTo,
row.getString(0)))
}
def toJSON(dataLinks: Array[DataLink]): Option[JValue] =
dataLinks.map(Extraction.decompose).reduceOption(_ ++ _)
}
你肯定不想收集,这就违背了在这里使用 spark 的意义。与 Spark 一样,您有很多选择。您可以使用 RDD,但我认为无需在此处切换模式。您只想将自定义逻辑应用于某些列,并最终得到一个仅包含结果列的数据框。
首先,定义一个要应用的UDF
:
def convert(target, source, source_table, relation_type) =
DataLink(source_table.split("\.")(0),
source_table.split("\.")(1),
source,
"dbname1.table1.csv".split(".")(0)
"dbname1.table1.csv".split(".")(1)
target))
然后将此函数应用于所有相关列(确保将其包装在 udf
中以使其成为 spark 函数而不是普通的 Scala 函数)并且 select
结果:
df.select(udf(convert)($"target", $"source", $"source_table", $"relation_type"))
如果您想要 DataFrame
包含 7 列的结果:
df.select(
split(col("source_table"), "\.").getItem(0),
split(col("source_table"), "\.").getItem(1),
col("source"),
lit("dbname1"),
lit("table1"),
col("target")
)
您还可以将 .as("column_name")
添加到这 7 列中的每一列。
您可以直接使用数据集。
import spark.implicits._
case class DataLink(schema_from: String,
table_from: String,
column_from: String,
link_type: String,
schema_to: String,
table_to: String,
column_to: String)
val filename = "dbname1.table1.csv"
val df = spark.read.option("header","true").csv("test.csv")
df.show(false)
+------------------+-------------+------------------------------------------+-------------+
|target |source |source_table |relation_type|
+------------------+-------------+------------------------------------------+-------------+
|avg_ensure_sum_12m|inn_num |custom_cib_ml_stg.p_overall_part_tend_cust|direct |
|avg_ensure_sum_12m|protocol_dttm|custom_cib_ml_stg.p_overall_part_tend_cust|direct |
|avg_ensure_sum_12m|inn_num |custom_cib_ml_stg.p_overall_part_tend_cust|indirect |
+------------------+-------------+------------------------------------------+-------------+
df.createOrReplaceTempView("table")
val df2 = spark.sql(s"""
select split(source_table, '[.]')[0] as schema_from
, split(source_table, '[.]')[1] as table_from
, source as column_from
, relation_type as link_type
, split('${filename}', '[.]')[0] as schema_to
, split('${filename}', '[.]')[1] as table_to
, target as column_to
from table
""").as[DataLink]
df2.show()
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
| schema_from| table_from| column_from|link_type|schema_to|table_to| column_to|
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
|custom_cib_ml_stg|p_overall_part_te...| inn_num| direct| dbname1| table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...|protocol_dttm| direct| dbname1| table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...| inn_num| indirect| dbname1| table1|avg_ensure_sum_12m|
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
我的进步...
现在,我可以创建新的 DataFrame,但他只包含 1 列。
val dfDL = spark.read.option("delimiter", ",")
.option("header", true)
.csv(file.getPath.toUri.getPath)
val convertCase = (target: String, source: String, source_table: String, relation_type: String) =>
DataLink(
source_table.split("\.")(0),
source_table.split("\.")(1),
source,
relation_type,
schemaTo,
tableTo,
target,
)
val udfConvert = udf(convertCase)
val dfForJson = dfDL.select(udfConvert(col("target"),
col("source"),
col("source_table"),
col("relation_type")))
我有 csv 文件:dbname1.table1.csv:
|target | source |source_table |relation_type|
---------------------------------------------------------------------------------------
avg_ensure_sum_12m | inn_num | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | protocol_dttm | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | inn_num | custom_cib_ml_stg.p_overall_part_tend_cust | indirect
此 table 的 csv 格式:
target,source,source_table,relation_type
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,protocol_dttm,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,indirect
然后我通过读取它来创建一个数据框:
val dfDL = spark.read.option("delimiter", ",")
.option("header", true)
.csv(file.getPath.toUri.getPath)
现在我需要创建一个基于 dfDL 的新数据框。
新数据框的结构如下所示:
case class DataLink(schema_from: String,
table_from: String,
column_from: String,
link_type: String,
schema_to: String,
table_to: String,
column_to: String)
新DataFrame的字段信息是从一个csv文件中获取的:
pseudocode:
schema_from = source_table.split(".")(0) // Example: custom_cib_ml_stg
table_from = source_table.split(".")(1) // Example: p_overall_part_tend_cust
column_from = source // Example: inn_num
link_type = relation_type // Example: direct
schema_to = "dbname1.table1.csv".split(".")(0) // Example: dbname1
table_to = "dbname1.table1.csv".split(".")(1) // Example: table1
column_to = target // Example: avg_ensure_sum_12m
我需要创建一个新的数据框。我一个人应付不了。
P.S。我需要这个数据框以便稍后从中创建一个 json 文件。 例子 JSON:
[{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"inn_num",
"link_type":"direct",
"schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"
},
{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"protocol_dttm",
"link_type":"direct","schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"}
我不喜欢我当前的实现:
def readDLFromHDFS(file: LocatedFileStatus): Array[DataLink] = {
val arrTableName = file.getPath.getName.split("\.")
val (schemaTo, tableTo) = (arrTableName(0), arrTableName(1))
val dfDL = spark.read.option("delimiter", ",")
.option("header", true)
.csv(file.getPath.toUri.getPath)
//val sourceTable = dfDL.select("source_table").collect().map(value => value.toString().split("."))
dfDL.collect.map(row => DataLink(row.getString(2).split("\.")(0),
row.getString(2).split("\.")(1),
row.getString(1),
row.getString(3),
schemaTo,
tableTo,
row.getString(0)))
}
def toJSON(dataLinks: Array[DataLink]): Option[JValue] =
dataLinks.map(Extraction.decompose).reduceOption(_ ++ _)
}
你肯定不想收集,这就违背了在这里使用 spark 的意义。与 Spark 一样,您有很多选择。您可以使用 RDD,但我认为无需在此处切换模式。您只想将自定义逻辑应用于某些列,并最终得到一个仅包含结果列的数据框。
首先,定义一个要应用的UDF
:
def convert(target, source, source_table, relation_type) =
DataLink(source_table.split("\.")(0),
source_table.split("\.")(1),
source,
"dbname1.table1.csv".split(".")(0)
"dbname1.table1.csv".split(".")(1)
target))
然后将此函数应用于所有相关列(确保将其包装在 udf
中以使其成为 spark 函数而不是普通的 Scala 函数)并且 select
结果:
df.select(udf(convert)($"target", $"source", $"source_table", $"relation_type"))
如果您想要 DataFrame
包含 7 列的结果:
df.select(
split(col("source_table"), "\.").getItem(0),
split(col("source_table"), "\.").getItem(1),
col("source"),
lit("dbname1"),
lit("table1"),
col("target")
)
您还可以将 .as("column_name")
添加到这 7 列中的每一列。
您可以直接使用数据集。
import spark.implicits._
case class DataLink(schema_from: String,
table_from: String,
column_from: String,
link_type: String,
schema_to: String,
table_to: String,
column_to: String)
val filename = "dbname1.table1.csv"
val df = spark.read.option("header","true").csv("test.csv")
df.show(false)
+------------------+-------------+------------------------------------------+-------------+
|target |source |source_table |relation_type|
+------------------+-------------+------------------------------------------+-------------+
|avg_ensure_sum_12m|inn_num |custom_cib_ml_stg.p_overall_part_tend_cust|direct |
|avg_ensure_sum_12m|protocol_dttm|custom_cib_ml_stg.p_overall_part_tend_cust|direct |
|avg_ensure_sum_12m|inn_num |custom_cib_ml_stg.p_overall_part_tend_cust|indirect |
+------------------+-------------+------------------------------------------+-------------+
df.createOrReplaceTempView("table")
val df2 = spark.sql(s"""
select split(source_table, '[.]')[0] as schema_from
, split(source_table, '[.]')[1] as table_from
, source as column_from
, relation_type as link_type
, split('${filename}', '[.]')[0] as schema_to
, split('${filename}', '[.]')[1] as table_to
, target as column_to
from table
""").as[DataLink]
df2.show()
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
| schema_from| table_from| column_from|link_type|schema_to|table_to| column_to|
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
|custom_cib_ml_stg|p_overall_part_te...| inn_num| direct| dbname1| table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...|protocol_dttm| direct| dbname1| table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...| inn_num| indirect| dbname1| table1|avg_ensure_sum_12m|
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
我的进步... 现在,我可以创建新的 DataFrame,但他只包含 1 列。
val dfDL = spark.read.option("delimiter", ",")
.option("header", true)
.csv(file.getPath.toUri.getPath)
val convertCase = (target: String, source: String, source_table: String, relation_type: String) =>
DataLink(
source_table.split("\.")(0),
source_table.split("\.")(1),
source,
relation_type,
schemaTo,
tableTo,
target,
)
val udfConvert = udf(convertCase)
val dfForJson = dfDL.select(udfConvert(col("target"),
col("source"),
col("source_table"),
col("relation_type")))