如何提高 Spark 性能?
How to improve Spark performance?
我有 Java 处理大型数据集的程序。数据集存储在 hdfs (csv) 中。
程序运行良好,但速度很慢。
程序的作用:
- 加载 csv 文件
- 字符串单独一行[]
- 筛选字符串数组
- 映射到 MyObject
- 将 MyObject 保存到 Cassandra
这是我的主要方法:
public static void main(String[] args) {
// configure spark
SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
.setMaster("local[*]")
.set("spark.executor.memory", "4g");
if (args.length > 1)
sparkConf.set("spark.cassandra.connection.host", args[1]);
// start a spark context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// read text file to RDD
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<MyObject> myObjectJavaRDD = lines
.map(line -> line.split(","))
.filter(someFilter)
.map(MyObject::new);
javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
}
我怎样才能提高性能?
感谢您的回答。
您的代码没有随机播放问题(除非您必须写出到 HDFS)并且默认分区由输入格式定义,在 Hadoop 上由 HDFS 核心和过滤器拆分或映射不更改分区。如果你能先过滤,你会看到一些改进
JavaRDD<MyObject> myObjectJavaRDD = lines
.filter(someFilter)
.map(line -> line.split(","))
.map(MyObject::new);
Spark 只能 运行 为一个 RDD 的每个分区分配 1 个并发任务,最多
集群中的核心。所以如果你有一个有 50 个核心的集群,你希望你的 RDD 至少
有50个分区。至于选择 "good" 个分区,您通常至少需要与
并行执行器的数量。您可以通过调用
来获取此计算值
sc.defaultParallelism
或通过
检查RDD分区数
someRDD.partitions.size
使用
读取文件创建 RDD 时
rdd = SparkContext().textFile("hdfs://…/file.txt")
分区数可能会少一些。理想情况下,你会得到相同的
您在 HDFS 中看到的块数,但是如果文件中的行太长(长于
块大小),分区会更少。
为RDD设置分区数的首选方法是直接将其作为
调用中的第二个输入参数,如
rdd = sc.textFile("hdfs://… /file.txt", 400)
其中 400 是分区数。在这种情况下,分区会进行 400 次拆分
由 Hadoop 的 TextInputFormat 完成,而不是 Spark,它会工作得更快。它的
另外,代码会生成 400 个并发任务以尝试将 file.txt 直接加载到 400
分区。
重新分区:增加分区,在过滤器增加并行度后重新平衡分区
repartition(numPartitions: Int)
Coalesce:在输出到 HDFS/external
之前减少分区而不随机合并
coalesce(numPartitions: Int, suffle: Boolean = false)
最后,同样重要的是,您可以使用不同的值和基准进行一些试验,看看该过程花费了多少时间
val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")
我希望,它有帮助
我有 Java 处理大型数据集的程序。数据集存储在 hdfs (csv) 中。
程序运行良好,但速度很慢。
程序的作用:
- 加载 csv 文件
- 字符串单独一行[]
- 筛选字符串数组
- 映射到 MyObject
- 将 MyObject 保存到 Cassandra
这是我的主要方法:
public static void main(String[] args) {
// configure spark
SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
.setMaster("local[*]")
.set("spark.executor.memory", "4g");
if (args.length > 1)
sparkConf.set("spark.cassandra.connection.host", args[1]);
// start a spark context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// read text file to RDD
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<MyObject> myObjectJavaRDD = lines
.map(line -> line.split(","))
.filter(someFilter)
.map(MyObject::new);
javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
}
我怎样才能提高性能?
感谢您的回答。
您的代码没有随机播放问题(除非您必须写出到 HDFS)并且默认分区由输入格式定义,在 Hadoop 上由 HDFS 核心和过滤器拆分或映射不更改分区。如果你能先过滤,你会看到一些改进
JavaRDD<MyObject> myObjectJavaRDD = lines
.filter(someFilter)
.map(line -> line.split(","))
.map(MyObject::new);
Spark 只能 运行 为一个 RDD 的每个分区分配 1 个并发任务,最多 集群中的核心。所以如果你有一个有 50 个核心的集群,你希望你的 RDD 至少 有50个分区。至于选择 "good" 个分区,您通常至少需要与 并行执行器的数量。您可以通过调用
来获取此计算值sc.defaultParallelism
或通过
检查RDD分区数someRDD.partitions.size
使用
读取文件创建 RDD 时rdd = SparkContext().textFile("hdfs://…/file.txt")
分区数可能会少一些。理想情况下,你会得到相同的 您在 HDFS 中看到的块数,但是如果文件中的行太长(长于 块大小),分区会更少。
为RDD设置分区数的首选方法是直接将其作为 调用中的第二个输入参数,如
rdd = sc.textFile("hdfs://… /file.txt", 400)
其中 400 是分区数。在这种情况下,分区会进行 400 次拆分 由 Hadoop 的 TextInputFormat 完成,而不是 Spark,它会工作得更快。它的 另外,代码会生成 400 个并发任务以尝试将 file.txt 直接加载到 400 分区。
重新分区:增加分区,在过滤器增加并行度后重新平衡分区
repartition(numPartitions: Int)
Coalesce:在输出到 HDFS/external
之前减少分区而不随机合并 coalesce(numPartitions: Int, suffle: Boolean = false)
最后,同样重要的是,您可以使用不同的值和基准进行一些试验,看看该过程花费了多少时间
val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")
我希望,它有帮助