从 hdfs 读取并写入 MySQL
Read from hdfs and write to MySQL
我是大数据开发新手。我有一个用例从 hdfs 读取数据,通过 spark 处理并保存到 MySQL db。保存到 MySQL 数据库的原因是报告工具指向 MySQL。
所以我想出了下面的流程来实现它。任何人都可以验证并建议需要任何 optimization/changes。
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema","true")
.option("nullValue","NA")
.option("mode","failfast")
.load("hdfs://localhost:9000/user/testuser/samples.csv")
val resultsdf = df.select("Sample","p16","Age","Race").filter($"Anatomy".like("BOT"))
val prop=new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user", "root")
prop.setProperty("password", "pw")
val url = "jdbc:mysql://localhost:3306/meta"
df.write.mode(SaveMode.Append).jdbc(url,"sample_metrics",prop)
此行需要更改 val resultdf= ...
,您正在使用列 Anatomy 进行过滤,但您没有 select 该列是 select 子句。添加该列,否则您将得到错误 - Analysis Exception unable to resolve column Anatomy.
val resultsdf = df.select("Sample","p16","Age","Race", "Anatomy").filter($"Anatomy".like("BOT"))
优化:
您可以使用其他属性,例如 numPartitions
和 batchsize
。
您可以阅读这些属性 here.
我是大数据开发新手。我有一个用例从 hdfs 读取数据,通过 spark 处理并保存到 MySQL db。保存到 MySQL 数据库的原因是报告工具指向 MySQL。 所以我想出了下面的流程来实现它。任何人都可以验证并建议需要任何 optimization/changes。
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema","true")
.option("nullValue","NA")
.option("mode","failfast")
.load("hdfs://localhost:9000/user/testuser/samples.csv")
val resultsdf = df.select("Sample","p16","Age","Race").filter($"Anatomy".like("BOT"))
val prop=new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user", "root")
prop.setProperty("password", "pw")
val url = "jdbc:mysql://localhost:3306/meta"
df.write.mode(SaveMode.Append).jdbc(url,"sample_metrics",prop)
此行需要更改 val resultdf= ...
,您正在使用列 Anatomy 进行过滤,但您没有 select 该列是 select 子句。添加该列,否则您将得到错误 - Analysis Exception unable to resolve column Anatomy.
val resultsdf = df.select("Sample","p16","Age","Race", "Anatomy").filter($"Anatomy".like("BOT"))
优化:
您可以使用其他属性,例如 numPartitions
和 batchsize
。
您可以阅读这些属性 here.