如何在 Spark 2.3.1 中使用 map 和 reduce 函数执行分组和计数

How to perform group and count using map and reduce function in Spark 2.3.1

我是一个新的火花蜜蜂,我正在尝试使用以下火花函数执行分组和计数:

 Dataset<Row> result =  dataset
       .groupBy("column1", "column2")
       .count();

但我读到 here 使用 group by 不是一个好主意,因为它没有组合器,这反过来会影响 spark 作业的运行时效率。 相反,应该使用 reduceByKey 函数进行聚合操作。

所以我尝试使用 reduceByKey 功能,但它不适用于 dataset。相反,数据集使用 reduce(ReduceFunction<Row> func).

由于找不到使用reduce函数执行分组和计数的示例,我尝试将其转换为JavaRDD并使用reduceByKey:

//map each row to 1 and then group them by key 
JavaPairRDD<String[], Integer> mapOnes;
            try {
                 mapOnes = dailySummary.javaRDD().mapToPair(
                        new PairFunction<Row, String[], Integer>() {
                            @Override
                            public Tuple2<String[], Integer> call(Row t) throws Exception {
                                return new Tuple2<String[], Integer>(new String[]{t.getAs("column1"), t.getAs("column2")}, 1);
                            }   
                });
            }catch(Exception e) {
                log.error("exception in mapping ones: "+e);
                throw new Exception();
            }


        JavaPairRDD<String[], Integer> rowCount;
        try {
            rowCount = mapOnes.reduceByKey(
                new Function2<Integer, Integer, Integer>() {

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1+v2;
                    }
                });
        }catch(Exception e) {
            log.error("exception in reduce by key: "+e);
            throw new Exception();
        }

但这也为 mapToPair 函数提供了异常 org.apache.spark.SparkException: Task not serializable

任何人都可以建议一种更好的方法来使用数据集的 reducemap 函数进行分组和执行计数。

感谢任何帮助。

你添加的link中的groupBy指的是RDD。在 RDD 语义中,groupBy 基本上会根据键对所有数据进行混洗,即将与键相关的所有值带到一个地方。

这就是为什么建议使用 reduceByKey 的原因,因为 reduceByKey 首先对每个分区执行归约操作,并且只有减少的值会被打乱,这意味着流量会少很多(并且可以防止将所有内容都放到一个分区时出现内存不足的问题)。

在数据集中,groupBy 的行为不同。它不提供数据集作为返回对象,而是提供 KeyValueGroupedDataset 对象。当您确实依靠这个对象(或更通用的 agg)时,它基本上定义了一个与 reduceByKey 非常相似的 reducer。

这意味着不需要单独的reduceByKey方法(数据集groupby实际上是reduceByKey的一种形式)。

坚持原来的 groupBy(...).count(...)

基于包含 2 列的数据集,一列是美国的县名,另一列是美国的州名。

期望的输出:

reduce()
Autauga County, Alabama, Baldwin County, Alabama, Barbour County, Alabama, Bibb County, Alabama, Blount County, Alabama, Bullock County, Alabama, Butler County, Alabama, Calhoun County, Alabama, Chambers County, Alabama, Cherokee County, Alabama, Chilton County,
…

用法:

System.out.println("reduce()");
String listOfCountyStateDs = countyStateDs
    .reduce(
        new CountyStateConcatenatorUsingReduce());
System.out.println(listOfCountyStateDs);

实施:

 private final class CountyStateConcatenatorUsingReduce
      implements ReduceFunction<String> {
    private static final long serialVersionUID = 12859L;

    @Override
    public String call(String v1, String v2) throws Exception {
      return v1 + ", " + v2;
    }
  }

但是,您将不得不编写自己的逻辑,这可能很耗时,而且无论如何您更愿意使用 groupBy...