如何使用 java api 在 Apache Spark 数据集中按 desc 排序?
How to order by desc in Apache Spark Dataset using java api?
我正在使用 spark session 读取文件,然后拆分单词并计算单词的迭代次数。我需要按降序显示数据
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
JavaRDD<Word> textFile = sparkSession
.read()
.textFile("/Users/myname/Documents/README.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split("[\s.]")).iterator())
.map(w -> {
Word word = new Word();
word.setWord(w.replace(",", ""));
return word;
});
Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();
当我使用 org.apache.spark.sql.functions.col("count")
时,它工作正常但无法按照 https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc(java.lang.String)
中的定义执行
df.sort(asc("dept"), desc("age"))
还有
没用。我想这是为了scala。 Java 中的这个等价物是什么?
您的代码应该按照 Spark Java 文档工作。您没有 post 导入语句。如果你还没有import
functions
。由于 desc()
和 asc()
函数在 functions
class 下。
所以你需要使用 org.apache.spark.sql.functionsasc("dept"), org.apache.spark.sql.functionsdesc("age")
或import org.apache.spark.sql.functions.*
在 Java 中,您必须以这种方式导入包:
import static org.apache.spark.sql.functions.*
我用的是spark 2.4.0
- 将下一个键设为假:
spark.kryo.registrationRequired
或
- 添加到 kryo :
kryo.register(org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder[].class);
kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.BoundReference.class);
kryo.register(org.apache.spark.sql.catalyst.trees.Origin.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.NullsFirst$.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.Descending$.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.NullsLast$.class);
kryo.register(Class.forName("scala.math.Ordering$$anon"));
kryo.register(Class.forName("scala.reflect.ClassTag$$anon"));
kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor"));
kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.Ascending$"));
private static SparkSession session;
public static void main(String[] args) {
/* DUMMY DATA creation */
List<Person> personsList = Arrays.asList(
new Person(1, "courriel_1@gmail.com", "nom_1"),
new Person(2, "courriel_2@gmail.com", "nom_2"),
new Person(3, "courriel_3@gmail.com", "nom_3"),
new Person(4, "courriel_4@gmail.com", "nom_4")
);
List<Profession> professionList = Arrays.asList(
new Profession(1, 2, "profession_4"),
new Profession(2, 1, "profession_2"),
new Profession(3, 1, "profession_5"),
new Profession(4, 2, "profession_2"),
new Profession(5, 2, "profession_5"),
new Profession(6, 3, "profession_7"),
new Profession(7, 3, "profession_2"),
new Profession(8, 4, "profession_2"),
new Profession(9, 4, "profession_7")
);
// SparkAppConfiguration.load(args);
// LaunchArgsEncoder launchArgs = SparkAppConfiguration.getLaunchArgs();
// Initialisation de la session
session = SparkUtils.initSession("test jointure");
/* Convert from Java list to Spark Dataset */
Dataset<Row> rowPerson = session.createDataFrame(personsList, Person.class);
System.out.println("rowPerson.show();");
rowPerson.show();
Dataset<Row> personRenamed = rowPerson.withColumnRenamed("id", "personId");
System.out.println("personRenamed.show();");
personRenamed.show();
Dataset<Row> rowProfession = session.createDataFrame(professionList, Profession.class);
System.out.println("rowProfession.show();");
rowProfession.show();
Dataset<Row> professionRenamed = rowProfession.withColumnRenamed("personId", "personFk");
System.out.println("professionRenamed.show();");
professionRenamed.show();
/* INNER JOIN IN Spark Java */
Dataset<Row> innerJoinData = personRenamed.join(professionRenamed,
personRenamed.col("personId").equalTo(professionRenamed.col("personFk")));
System.out.println("innerJoinData.show();");
innerJoinData.show();
Dataset<Jointure> joinResult = innerJoinData.select("personId", "nom", "courriel", "id", "profession")
.orderBy(org.apache.spark.sql.functions.col("personId").asc())
.as(Encoders.bean(Jointure.class));
System.out.println("joinResult.show();");
joinResult.show();
System.out.println("joinResult.printSchema();");
joinResult.printSchema();
System.exit(0);
}
public class Person implements Serializable{
/**
*
*/
private static final long serialVersionUID = 7327130742162877288L;
private long personId;
private String nom;
private String prenom;
private String courriel;
private String profession;
private String ville;
public Person(long personId, String nom, String prenom, String courriel, String profession, String ville) {
super();
this.personId = personId;
this.nom = nom;
this.prenom = prenom;
this.courriel = courriel;
this.profession = profession;
this.ville = ville;
}
public Person() {
super();
}
//getter and setter
}
public class Profession implements Serializable {
/**
*
*/
private static final long serialVersionUID = 7845266779357094461L;
private long id;
private long personId;
private String profession;
public Profession(long id, long personId, String profession) {
super();
this.id = id;
this.personId = personId;
this.profession = profession;
}
public Profession() {
super();
}
//getter and setter
}
public class Jointure implements Serializable {
/**
*
*/
private static final long serialVersionUID = 4341834876589947018L;
private long id;
private String nom;
private String prenom;
private String courriel;
private String profession;
public Jointure(long id, String nom, String prenom, String courriel, String profession) {
super();
this.id = id;
this.nom = nom;
this.prenom = prenom;
this.courriel = courriel;
this.profession = profession;
}
public Jointure() {
super();
}
//getter and setter
}
rowPerson.show();
+--------------------+---+-----+
| courriel| id| nom|
+--------------------+---+-----+
|courriel_1@gmail.com| 1|nom_1|
|courriel_2@gmail.com| 2|nom_2|
|courriel_3@gmail.com| 3|nom_3|
|courriel_4@gmail.com| 4|nom_4|
+--------------------+---+-----+
personRenamed.show();
+--------------------+--------+-----+
| courriel|personId| nom|
+--------------------+--------+-----+
|courriel_1@gmail.com| 1|nom_1|
|courriel_2@gmail.com| 2|nom_2|
|courriel_3@gmail.com| 3|nom_3|
|courriel_4@gmail.com| 4|nom_4|
+--------------------+--------+-----+
rowProfession.show();
+---+--------+------------+
| id|personId| profession|
+---+--------+------------+
| 1| 2|profession_4|
| 2| 1|profession_2|
| 3| 1|profession_5|
| 4| 2|profession_2|
| 5| 2|profession_5|
| 6| 3|profession_7|
| 7| 3|profession_2|
| 8| 4|profession_2|
| 9| 4|profession_7|
+---+--------+------------+
professionRenamed.show();
+---+--------+------------+
| id|personFk| profession|
+---+--------+------------+
| 1| 2|profession_4|
| 2| 1|profession_2|
| 3| 1|profession_5|
| 4| 2|profession_2|
| 5| 2|profession_5|
| 6| 3|profession_7|
| 7| 3|profession_2|
| 8| 4|profession_2|
| 9| 4|profession_7|
+---+--------+------------+
innerJoinData.show();
+--------------------+--------+-----+---+--------+------------+
| courriel|personId| nom| id|personFk| profession|
+--------------------+--------+-----+---+--------+------------+
|courriel_2@gmail.com| 2|nom_2| 1| 2|profession_4|
|courriel_1@gmail.com| 1|nom_1| 2| 1|profession_2|
|courriel_1@gmail.com| 1|nom_1| 3| 1|profession_5|
|courriel_2@gmail.com| 2|nom_2| 4| 2|profession_2|
|courriel_2@gmail.com| 2|nom_2| 5| 2|profession_5|
|courriel_3@gmail.com| 3|nom_3| 6| 3|profession_7|
|courriel_3@gmail.com| 3|nom_3| 7| 3|profession_2|
|courriel_4@gmail.com| 4|nom_4| 8| 4|profession_2|
|courriel_4@gmail.com| 4|nom_4| 9| 4|profession_7|
+--------------------+--------+-----+---+--------+------------+
joinResult.show();
+--------+-----+--------------------+---+------------+
|personId| nom| courriel| id| profession|
+--------+-----+--------------------+---+------------+
| 1|nom_1|courriel_1@gmail.com| 3|profession_5|
| 1|nom_1|courriel_1@gmail.com| 2|profession_2|
| 2|nom_2|courriel_2@gmail.com| 4|profession_2|
| 2|nom_2|courriel_2@gmail.com| 5|profession_5|
| 2|nom_2|courriel_2@gmail.com| 1|profession_4|
| 3|nom_3|courriel_3@gmail.com| 7|profession_2|
| 3|nom_3|courriel_3@gmail.com| 6|profession_7|
| 4|nom_4|courriel_4@gmail.com| 8|profession_2|
| 4|nom_4|courriel_4@gmail.com| 9|profession_7|
+--------+-----+--------------------+---+------------+
joinResult.printSchema();
root
|-- personId: long (nullable = false)
|-- nom: string (nullable = true)
|-- courriel: string (nullable = true)
|-- id: long (nullable = false)
|-- profession: string (nullable = true)
我正在使用 spark session 读取文件,然后拆分单词并计算单词的迭代次数。我需要按降序显示数据
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
JavaRDD<Word> textFile = sparkSession
.read()
.textFile("/Users/myname/Documents/README.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split("[\s.]")).iterator())
.map(w -> {
Word word = new Word();
word.setWord(w.replace(",", ""));
return word;
});
Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();
当我使用 org.apache.spark.sql.functions.col("count")
时,它工作正常但无法按照 https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc(java.lang.String)
df.sort(asc("dept"), desc("age"))
还有
您的代码应该按照 Spark Java 文档工作。您没有 post 导入语句。如果你还没有import
functions
。由于 desc()
和 asc()
函数在 functions
class 下。
所以你需要使用 org.apache.spark.sql.functionsasc("dept"), org.apache.spark.sql.functionsdesc("age")
或import org.apache.spark.sql.functions.*
在 Java 中,您必须以这种方式导入包:
import static org.apache.spark.sql.functions.*
我用的是spark 2.4.0
- 将下一个键设为假:
spark.kryo.registrationRequired
或
- 添加到 kryo :
kryo.register(org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder[].class);
kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.BoundReference.class);
kryo.register(org.apache.spark.sql.catalyst.trees.Origin.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.NullsFirst$.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.Descending$.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.NullsLast$.class);
kryo.register(Class.forName("scala.math.Ordering$$anon"));
kryo.register(Class.forName("scala.reflect.ClassTag$$anon"));
kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor"));
kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.Ascending$"));
private static SparkSession session;
public static void main(String[] args) {
/* DUMMY DATA creation */
List<Person> personsList = Arrays.asList(
new Person(1, "courriel_1@gmail.com", "nom_1"),
new Person(2, "courriel_2@gmail.com", "nom_2"),
new Person(3, "courriel_3@gmail.com", "nom_3"),
new Person(4, "courriel_4@gmail.com", "nom_4")
);
List<Profession> professionList = Arrays.asList(
new Profession(1, 2, "profession_4"),
new Profession(2, 1, "profession_2"),
new Profession(3, 1, "profession_5"),
new Profession(4, 2, "profession_2"),
new Profession(5, 2, "profession_5"),
new Profession(6, 3, "profession_7"),
new Profession(7, 3, "profession_2"),
new Profession(8, 4, "profession_2"),
new Profession(9, 4, "profession_7")
);
// SparkAppConfiguration.load(args);
// LaunchArgsEncoder launchArgs = SparkAppConfiguration.getLaunchArgs();
// Initialisation de la session
session = SparkUtils.initSession("test jointure");
/* Convert from Java list to Spark Dataset */
Dataset<Row> rowPerson = session.createDataFrame(personsList, Person.class);
System.out.println("rowPerson.show();");
rowPerson.show();
Dataset<Row> personRenamed = rowPerson.withColumnRenamed("id", "personId");
System.out.println("personRenamed.show();");
personRenamed.show();
Dataset<Row> rowProfession = session.createDataFrame(professionList, Profession.class);
System.out.println("rowProfession.show();");
rowProfession.show();
Dataset<Row> professionRenamed = rowProfession.withColumnRenamed("personId", "personFk");
System.out.println("professionRenamed.show();");
professionRenamed.show();
/* INNER JOIN IN Spark Java */
Dataset<Row> innerJoinData = personRenamed.join(professionRenamed,
personRenamed.col("personId").equalTo(professionRenamed.col("personFk")));
System.out.println("innerJoinData.show();");
innerJoinData.show();
Dataset<Jointure> joinResult = innerJoinData.select("personId", "nom", "courriel", "id", "profession")
.orderBy(org.apache.spark.sql.functions.col("personId").asc())
.as(Encoders.bean(Jointure.class));
System.out.println("joinResult.show();");
joinResult.show();
System.out.println("joinResult.printSchema();");
joinResult.printSchema();
System.exit(0);
}
public class Person implements Serializable{
/**
*
*/
private static final long serialVersionUID = 7327130742162877288L;
private long personId;
private String nom;
private String prenom;
private String courriel;
private String profession;
private String ville;
public Person(long personId, String nom, String prenom, String courriel, String profession, String ville) {
super();
this.personId = personId;
this.nom = nom;
this.prenom = prenom;
this.courriel = courriel;
this.profession = profession;
this.ville = ville;
}
public Person() {
super();
}
//getter and setter
}
public class Profession implements Serializable {
/**
*
*/
private static final long serialVersionUID = 7845266779357094461L;
private long id;
private long personId;
private String profession;
public Profession(long id, long personId, String profession) {
super();
this.id = id;
this.personId = personId;
this.profession = profession;
}
public Profession() {
super();
}
//getter and setter
}
public class Jointure implements Serializable {
/**
*
*/
private static final long serialVersionUID = 4341834876589947018L;
private long id;
private String nom;
private String prenom;
private String courriel;
private String profession;
public Jointure(long id, String nom, String prenom, String courriel, String profession) {
super();
this.id = id;
this.nom = nom;
this.prenom = prenom;
this.courriel = courriel;
this.profession = profession;
}
public Jointure() {
super();
}
//getter and setter
}
rowPerson.show();
+--------------------+---+-----+
| courriel| id| nom|
+--------------------+---+-----+
|courriel_1@gmail.com| 1|nom_1|
|courriel_2@gmail.com| 2|nom_2|
|courriel_3@gmail.com| 3|nom_3|
|courriel_4@gmail.com| 4|nom_4|
+--------------------+---+-----+
personRenamed.show();
+--------------------+--------+-----+
| courriel|personId| nom|
+--------------------+--------+-----+
|courriel_1@gmail.com| 1|nom_1|
|courriel_2@gmail.com| 2|nom_2|
|courriel_3@gmail.com| 3|nom_3|
|courriel_4@gmail.com| 4|nom_4|
+--------------------+--------+-----+
rowProfession.show();
+---+--------+------------+
| id|personId| profession|
+---+--------+------------+
| 1| 2|profession_4|
| 2| 1|profession_2|
| 3| 1|profession_5|
| 4| 2|profession_2|
| 5| 2|profession_5|
| 6| 3|profession_7|
| 7| 3|profession_2|
| 8| 4|profession_2|
| 9| 4|profession_7|
+---+--------+------------+
professionRenamed.show();
+---+--------+------------+
| id|personFk| profession|
+---+--------+------------+
| 1| 2|profession_4|
| 2| 1|profession_2|
| 3| 1|profession_5|
| 4| 2|profession_2|
| 5| 2|profession_5|
| 6| 3|profession_7|
| 7| 3|profession_2|
| 8| 4|profession_2|
| 9| 4|profession_7|
+---+--------+------------+
innerJoinData.show();
+--------------------+--------+-----+---+--------+------------+
| courriel|personId| nom| id|personFk| profession|
+--------------------+--------+-----+---+--------+------------+
|courriel_2@gmail.com| 2|nom_2| 1| 2|profession_4|
|courriel_1@gmail.com| 1|nom_1| 2| 1|profession_2|
|courriel_1@gmail.com| 1|nom_1| 3| 1|profession_5|
|courriel_2@gmail.com| 2|nom_2| 4| 2|profession_2|
|courriel_2@gmail.com| 2|nom_2| 5| 2|profession_5|
|courriel_3@gmail.com| 3|nom_3| 6| 3|profession_7|
|courriel_3@gmail.com| 3|nom_3| 7| 3|profession_2|
|courriel_4@gmail.com| 4|nom_4| 8| 4|profession_2|
|courriel_4@gmail.com| 4|nom_4| 9| 4|profession_7|
+--------------------+--------+-----+---+--------+------------+
joinResult.show();
+--------+-----+--------------------+---+------------+
|personId| nom| courriel| id| profession|
+--------+-----+--------------------+---+------------+
| 1|nom_1|courriel_1@gmail.com| 3|profession_5|
| 1|nom_1|courriel_1@gmail.com| 2|profession_2|
| 2|nom_2|courriel_2@gmail.com| 4|profession_2|
| 2|nom_2|courriel_2@gmail.com| 5|profession_5|
| 2|nom_2|courriel_2@gmail.com| 1|profession_4|
| 3|nom_3|courriel_3@gmail.com| 7|profession_2|
| 3|nom_3|courriel_3@gmail.com| 6|profession_7|
| 4|nom_4|courriel_4@gmail.com| 8|profession_2|
| 4|nom_4|courriel_4@gmail.com| 9|profession_7|
+--------+-----+--------------------+---+------------+
joinResult.printSchema();
root
|-- personId: long (nullable = false)
|-- nom: string (nullable = true)
|-- courriel: string (nullable = true)
|-- id: long (nullable = false)
|-- profession: string (nullable = true)