iterate/stream Spark Dataframe 的最佳方式
Best way to iterate/stream a Spark Dataframe
我有一个 Spark 作业,它读取一个包含大约 150.000.000 key/value 个条目的镶木地板文件。
SparkConf conf = new SparkConf();
conf.setAppName("Job");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sql = new SQLContext(jsc);
DataFrame df = sql.read().parquet(path);
我的 objective 是将 key/value 对写入 HBase,但我遇到了堆内存问题,我怀疑这不是最好的方法。我想将计算推向集群,但我不知道如何跳过收集部分。现在我的代码如下所示:
HBaseClient client = HbaseWrapper.initClient();
df.collectAsList().stream().forEach(row -> {
try {
HbaseWrapper.putRows(client, row);
} catch (Exception e) {
e.printStackTrace();
}
});
jsc.stop();
我试着先收集一个列表而不是流媒体,然后把它写下来,但这也需要很长时间。
任何见解表示赞赏。
您收到 OOM 错误,因为 collectAsList 将所有数据发送到驱动程序。
为了解决你可以使用 foreachPartitions,所以你将并行流式传输到 Hbase。
df.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
@Override
public void call(Iterator<Row> t) throws Exception {
try {
HBaseClient client = HbaseWrapper.initClient();
while(t.hasNext()){
Row row = t.next();
HbaseWrapper.putRows(client, row);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
我有一个 Spark 作业,它读取一个包含大约 150.000.000 key/value 个条目的镶木地板文件。
SparkConf conf = new SparkConf();
conf.setAppName("Job");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sql = new SQLContext(jsc);
DataFrame df = sql.read().parquet(path);
我的 objective 是将 key/value 对写入 HBase,但我遇到了堆内存问题,我怀疑这不是最好的方法。我想将计算推向集群,但我不知道如何跳过收集部分。现在我的代码如下所示:
HBaseClient client = HbaseWrapper.initClient();
df.collectAsList().stream().forEach(row -> {
try {
HbaseWrapper.putRows(client, row);
} catch (Exception e) {
e.printStackTrace();
}
});
jsc.stop();
我试着先收集一个列表而不是流媒体,然后把它写下来,但这也需要很长时间。
任何见解表示赞赏。
您收到 OOM 错误,因为 collectAsList 将所有数据发送到驱动程序。
为了解决你可以使用 foreachPartitions,所以你将并行流式传输到 Hbase。
df.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
@Override
public void call(Iterator<Row> t) throws Exception {
try {
HBaseClient client = HbaseWrapper.initClient();
while(t.hasNext()){
Row row = t.next();
HbaseWrapper.putRows(client, row);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});