从 SQL 查询创建 Spark Dataframe
Create Spark Dataframe from SQL Query
我确定这是一个简单的SQL上下文问题,但我在 Spark 文档或 Whosebug 中找不到任何答案
我想根据 MySQL
上的 SQL 查询创建一个 Spark Dataframe
例如,我有一个复杂的 MySQL 查询,例如
SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...
我想要一个包含 X、Y 和 Z 列的数据框
我想出了如何将整个 table 加载到 Spark 中,我可以加载它们,然后在那里进行连接和选择。然而,这是非常低效的。我只想加载由我的 SQL 查询生成的 table。
这是我目前的近似代码,它不起作用。 Mysql-连接器有一个选项"dbtable",可用于加载整个table。我希望有一些方法可以指定查询
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
).load()
如果您的 table
已经在 SQLContext 中注册,您可以简单地使用 sql
方法。
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
要将查询的输出保存到新的数据框,只需将结果设置为一个变量即可:
val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
现在 newDataFrame
是一个具有所有可用数据框功能的数据框。
TL;DR: 只需在您的数据库中创建一个视图。
详情:
我的 postgres 数据库中有一个 table t_city,我在其上创建了一个视图:
create view v_city_3500 as
select asciiname, country, population, elevation
from t_city
where elevation>3500
and population>100000
select * from v_city_3500;
asciiname | country | population | elevation
-----------+---------+------------+-----------
Potosi | BO | 141251 | 3967
Oruro | BO | 208684 | 3936
La Paz | BO | 812799 | 3782
Lhasa | CN | 118721 | 3651
Puno | PE | 116552 | 3825
Juliaca | PE | 245675 | 3834
在火花-shell:
val sx= new org.apache.spark.sql.SQLContext(sc)
var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"
val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
结果:
city_df.count()
Long = 145725
city_3500_df.count()
Long = 6
我在这里找到这个
dbname 参数可以是用别名括在括号中的任何查询。所以就我而言,我需要这样做:
val query = """
(select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
option("dbtable",query).
load()
正如预期的那样,将每个 table 作为自己的 Dataframe 加载并将它们加入 Spark 是非常低效的。
MYSQL read/loading 数据如下
val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
"dbtable" -> "TABLE_NAME")).load()
如下向table写入数据
import java.util.Properties
val prop = new Properties()
prop.put("user", "<>")
prop.put("password", "simple3")
val dfWriter = jdbcDF.write.mode("append")
dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)
要从查询创建数据框,请执行如下操作
val finalModelDataDF = {
val query = "select * from table_name"
sqlContext.sql(query)
};
finalModelDataDF.show()
我确定这是一个简单的SQL上下文问题,但我在 Spark 文档或 Whosebug 中找不到任何答案
我想根据 MySQL
上的 SQL 查询创建一个 Spark Dataframe例如,我有一个复杂的 MySQL 查询,例如
SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...
我想要一个包含 X、Y 和 Z 列的数据框
我想出了如何将整个 table 加载到 Spark 中,我可以加载它们,然后在那里进行连接和选择。然而,这是非常低效的。我只想加载由我的 SQL 查询生成的 table。
这是我目前的近似代码,它不起作用。 Mysql-连接器有一个选项"dbtable",可用于加载整个table。我希望有一些方法可以指定查询
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
).load()
如果您的 table
已经在 SQLContext 中注册,您可以简单地使用 sql
方法。
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
要将查询的输出保存到新的数据框,只需将结果设置为一个变量即可:
val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
现在 newDataFrame
是一个具有所有可用数据框功能的数据框。
TL;DR: 只需在您的数据库中创建一个视图。
详情: 我的 postgres 数据库中有一个 table t_city,我在其上创建了一个视图:
create view v_city_3500 as
select asciiname, country, population, elevation
from t_city
where elevation>3500
and population>100000
select * from v_city_3500;
asciiname | country | population | elevation
-----------+---------+------------+-----------
Potosi | BO | 141251 | 3967
Oruro | BO | 208684 | 3936
La Paz | BO | 812799 | 3782
Lhasa | CN | 118721 | 3651
Puno | PE | 116552 | 3825
Juliaca | PE | 245675 | 3834
在火花-shell:
val sx= new org.apache.spark.sql.SQLContext(sc)
var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"
val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
结果:
city_df.count()
Long = 145725
city_3500_df.count()
Long = 6
我在这里找到这个
dbname 参数可以是用别名括在括号中的任何查询。所以就我而言,我需要这样做:
val query = """
(select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
option("dbtable",query).
load()
正如预期的那样,将每个 table 作为自己的 Dataframe 加载并将它们加入 Spark 是非常低效的。
MYSQL read/loading 数据如下
val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
"dbtable" -> "TABLE_NAME")).load()
如下向table写入数据
import java.util.Properties
val prop = new Properties()
prop.put("user", "<>")
prop.put("password", "simple3")
val dfWriter = jdbcDF.write.mode("append")
dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)
要从查询创建数据框,请执行如下操作
val finalModelDataDF = {
val query = "select * from table_name"
sqlContext.sql(query)
};
finalModelDataDF.show()