Spark:如何在 Dataframe API 中翻译 count(distinct(value))
Spark: How to translate count(distinct(value)) in Dataframe API's
我正在尝试比较聚合数据的不同方法。
这是我的输入数据,包含 2 个元素(页面、访问者):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
使用以下代码将 SQL 命令用于 Spark SQL:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
我得到这个输出:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
现在,我想使用 Dataframes 和他们 API 获得相同的结果,但我无法获得相同的输出:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
事实上,这就是我得到的输出:
[PAG1,8] // just the simple page count for every page
[PAG2,4]
你需要的是DataFrame聚合函数countDistinct
:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2))
.toDF()
val result = logs.select("page","visitor")
.groupBy('page)
.agg('page, countDistinct('visitor))
result.foreach(println)
您可以使用 dataframe 的 groupBy
命令两次来执行此操作。这里,df1
是您的原始输入。
val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))
此命令将产生以下结果:
page visitor count
---- ------ ----
PAG2 V2 2
PAG1 V3 1
PAG1 V1 5
PAG1 V2 2
PAG2 V1 2
然后再次使用groupBy
命令得到最终结果
df2.groupBy($"page").agg(count($"visitor").as("count"))
最终输出:
page count
---- ----
PAG1 3
PAG2 2
我认为在较新版本的 Spark 中它更容易。下面是用2.4.0测试的。
1. 首先,为样本创建一个数组。
val myArr = Array(
("PAG1","V1"),
("PAG1","V1"),
("PAG2","V1"),
("PAG2","V2"),
("PAG2","V1"),
("PAG1","V1"),
("PAG1","V2"),
("PAG1","V1"),
("PAG1","V2"),
("PAG1","V1"),
("PAG2","V2"),
("PAG1","V3")
)
2。创建数据框
val logs = spark.createDataFrame(myArr)
.withColumnRenamed("_1","page")
.withColumnRenamed("_2","visitor")
3。现在使用 distinctCount spark sql 函数
进行聚合
import org.apache.spark.sql.{functions => F}
logs.groupBy("page").agg(
F.countDistinct("visitor").as("visitor"))
.show()
4。预期结果:
+----+-------+
|page|visitor|
+----+-------+
|PAG1| 3|
|PAG2| 2|
+----+-------+
如果要显示列的不同值,请使用此选项
display(sparkDF.select('columnName').distinct())
我正在尝试比较聚合数据的不同方法。
这是我的输入数据,包含 2 个元素(页面、访问者):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
使用以下代码将 SQL 命令用于 Spark SQL:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
我得到这个输出:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
现在,我想使用 Dataframes 和他们 API 获得相同的结果,但我无法获得相同的输出:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
事实上,这就是我得到的输出:
[PAG1,8] // just the simple page count for every page
[PAG2,4]
你需要的是DataFrame聚合函数countDistinct
:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2))
.toDF()
val result = logs.select("page","visitor")
.groupBy('page)
.agg('page, countDistinct('visitor))
result.foreach(println)
您可以使用 dataframe 的 groupBy
命令两次来执行此操作。这里,df1
是您的原始输入。
val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))
此命令将产生以下结果:
page visitor count
---- ------ ----
PAG2 V2 2
PAG1 V3 1
PAG1 V1 5
PAG1 V2 2
PAG2 V1 2
然后再次使用groupBy
命令得到最终结果
df2.groupBy($"page").agg(count($"visitor").as("count"))
最终输出:
page count
---- ----
PAG1 3
PAG2 2
我认为在较新版本的 Spark 中它更容易。下面是用2.4.0测试的。 1. 首先,为样本创建一个数组。
val myArr = Array(
("PAG1","V1"),
("PAG1","V1"),
("PAG2","V1"),
("PAG2","V2"),
("PAG2","V1"),
("PAG1","V1"),
("PAG1","V2"),
("PAG1","V1"),
("PAG1","V2"),
("PAG1","V1"),
("PAG2","V2"),
("PAG1","V3")
)
2。创建数据框
val logs = spark.createDataFrame(myArr)
.withColumnRenamed("_1","page")
.withColumnRenamed("_2","visitor")
3。现在使用 distinctCount spark sql 函数
进行聚合import org.apache.spark.sql.{functions => F}
logs.groupBy("page").agg(
F.countDistinct("visitor").as("visitor"))
.show()
4。预期结果:
+----+-------+
|page|visitor|
+----+-------+
|PAG1| 3|
|PAG2| 2|
+----+-------+
如果要显示列的不同值,请使用此选项
display(sparkDF.select('columnName').distinct())