使用 Apache Spark 从数据框中获取不同的计数
Getting a distinct count from a dataframe using Apache Spark
我有这样的数据
+--------------+---------+-------+---------+
| dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
| Best| tree| 5| 533|
| OK| bush| e| 3535|
| MEH| cow| -| 3353|
| MEH| oak| none| 12|
+--------------+---------+-------+---------+
我正在尝试将其放入
的输出中
+--------------+---------+
| dataOne| Count|
+--------------+---------|
| Best| 1|
| OK| 1|
| Meh| 2|
+--------------+---------+
我可以毫无问题地将 dataOne 单独放入数据框中并显示它的内容,以确保我只是抓取 dataOne 列,
但是,我似乎找不到将 sql 查询转换为我需要的数据的正确语法。我尝试从整个数据集
创建的临时视图中创建以下数据框
Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from
dataFrame group by dataOne");
dataOneCount.show();
但是火花
我能够找到的文档仅显示了如何在 spark 1.6 和之前的版本中进行这种类型的聚合,因此我们将不胜感激。
这是我收到的错误消息,但是我已经检查了数据,其中没有索引错误。
java.lang.ArrayIndexOutOfBoundsException: 11
我也试过应用 functions() 方法 countDistinct
Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();
其中 dataOneDataFrame 是从 运行
创建的数据帧
select dataOne from dataFrame
但它 returns 是一个分析异常,我对 spark 还是个新手所以我不确定 how/when 是否有错误我正在评估 countDistinct 方法
编辑:为了澄清,显示的第一个 table 是我通过读取文本文件并对其应用自定义模式(它们仍然都是字符串)创建的数据帧的结果
Dataset<Row> dataFrame
这是我的完整代码
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
//args[0] is the textfile location
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "dataOne OtherData dataTwo dataThree";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
});
Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);
//first attempt
dF.groupBy(col("dataOne")).count().show();
//Trying with a sql statement
dF.createOrReplaceTempView("view");
dF.sparkSession().sql("select command, count(*) from view group by command").show();
最有可能想到的是 returns 行使用 RowFactory 的 lambda 函数?这个想法似乎不错,但我不确定它是否真的成立,或者是否有其他方法可以做到。除此之外,我很困惑
示例数据
best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12
为方便起见,使用 Scala 语法。它与 Java 语法非常相似:
// Input data
val df = {
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("dataOne", StringType) ::
StructField("OtherData", StringType) ::
StructField("dataTwo", StringType) ::
StructField("dataThree", IntegerType) :: Nil)
val data = List(
Row("Best", "tree", "5", 533),
Row("OK", "bush", "e", 3535),
Row("MEH", "cow", "-", 3353),
Row("MEH", "oak", "none", 12)
)
spark.createDataFrame(data.asJava, simpleSchema)
}
df.show
+-------+---------+-------+---------+
|dataOne|OtherData|dataTwo|dataThree|
+-------+---------+-------+---------+
| Best| tree| 5| 533|
| OK| bush| e| 3535|
| MEH| cow| -| 3353|
| MEH| oak| none| 12|
+-------+---------+-------+---------+
df.groupBy(col("dataOne")).count().show()
+-------+-----+
|dataOne|count|
+-------+-----+
| MEH| 2|
| Best| 1|
| OK| 1|
+-------+-----+
我可以使用 S3 上的四行数据文件提交上面给出的 Java 代码,它工作正常:
$SPARK_HOME/bin/spark-submit \
--class sparktest.FromWhosebug \
--packages "org.apache.hadoop:hadoop-aws:2.7.3" \
target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt"
我有这样的数据
+--------------+---------+-------+---------+
| dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
| Best| tree| 5| 533|
| OK| bush| e| 3535|
| MEH| cow| -| 3353|
| MEH| oak| none| 12|
+--------------+---------+-------+---------+
我正在尝试将其放入
的输出中+--------------+---------+
| dataOne| Count|
+--------------+---------|
| Best| 1|
| OK| 1|
| Meh| 2|
+--------------+---------+
我可以毫无问题地将 dataOne 单独放入数据框中并显示它的内容,以确保我只是抓取 dataOne 列, 但是,我似乎找不到将 sql 查询转换为我需要的数据的正确语法。我尝试从整个数据集
创建的临时视图中创建以下数据框Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from
dataFrame group by dataOne");
dataOneCount.show();
但是火花 我能够找到的文档仅显示了如何在 spark 1.6 和之前的版本中进行这种类型的聚合,因此我们将不胜感激。
这是我收到的错误消息,但是我已经检查了数据,其中没有索引错误。
java.lang.ArrayIndexOutOfBoundsException: 11
我也试过应用 functions() 方法 countDistinct
Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();
其中 dataOneDataFrame 是从 运行
创建的数据帧select dataOne from dataFrame
但它 returns 是一个分析异常,我对 spark 还是个新手所以我不确定 how/when 是否有错误我正在评估 countDistinct 方法
编辑:为了澄清,显示的第一个 table 是我通过读取文本文件并对其应用自定义模式(它们仍然都是字符串)创建的数据帧的结果
Dataset<Row> dataFrame
这是我的完整代码
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
//args[0] is the textfile location
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "dataOne OtherData dataTwo dataThree";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
});
Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);
//first attempt
dF.groupBy(col("dataOne")).count().show();
//Trying with a sql statement
dF.createOrReplaceTempView("view");
dF.sparkSession().sql("select command, count(*) from view group by command").show();
最有可能想到的是 returns 行使用 RowFactory 的 lambda 函数?这个想法似乎不错,但我不确定它是否真的成立,或者是否有其他方法可以做到。除此之外,我很困惑
示例数据
best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12
为方便起见,使用 Scala 语法。它与 Java 语法非常相似:
// Input data
val df = {
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("dataOne", StringType) ::
StructField("OtherData", StringType) ::
StructField("dataTwo", StringType) ::
StructField("dataThree", IntegerType) :: Nil)
val data = List(
Row("Best", "tree", "5", 533),
Row("OK", "bush", "e", 3535),
Row("MEH", "cow", "-", 3353),
Row("MEH", "oak", "none", 12)
)
spark.createDataFrame(data.asJava, simpleSchema)
}
df.show
+-------+---------+-------+---------+ |dataOne|OtherData|dataTwo|dataThree| +-------+---------+-------+---------+ | Best| tree| 5| 533| | OK| bush| e| 3535| | MEH| cow| -| 3353| | MEH| oak| none| 12| +-------+---------+-------+---------+
df.groupBy(col("dataOne")).count().show()
+-------+-----+ |dataOne|count| +-------+-----+ | MEH| 2| | Best| 1| | OK| 1| +-------+-----+
我可以使用 S3 上的四行数据文件提交上面给出的 Java 代码,它工作正常:
$SPARK_HOME/bin/spark-submit \
--class sparktest.FromWhosebug \
--packages "org.apache.hadoop:hadoop-aws:2.7.3" \
target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt"