无法 运行 大型数据集的 Spark 作业
Cannot run Spark jobs for large datasets
我编写了一个 Spark 作业来读取 S3 中的 Hive 数据并生成 HFile。
这项工作在只读取一个ORC文件(大约190MB)时工作正常,但是,当我用它读取整个S3目录时,大约400个ORC文件,所以大约400*190MB = 76GB数据,它一直在 error/stacktrace:
之后抛出这个
17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39149; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/06/12 01:48:03 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-211-127-63.ap-northeast-2.compute.internal, executor 9): java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
17/06/12 01:48:03 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 0.0 (TID 541, ip-10-211-126-250.ap-northeast-2.compute.internal, executor 72, partition 6, PROCESS_LOCAL, 6680 bytes)
17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39151; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
我的集群足够大来处理它:(这已经被验证)
它有 40 个节点,超过 800 GB 可用内存,320 个 VCore。
这是我的 Java 代码:
protected void sparkGenerateHFiles(JavaRDD<Row> rdd) throws IOException {
System.out.println("In sparkGenerateHFiles....");
JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD = rdd.mapToPair(
new PairFunction<Row, ImmutableBytesWritable, KeyValue>() {
public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception {
System.out.println("running call now ....");
String key = (String) row.get(0);
String value = (String) row.get(1);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
byte[] rowKeyBytes = Bytes.toBytes(key);
rowKey.set(rowKeyBytes);
KeyValue keyValue = new KeyValue(rowKeyBytes,
Bytes.toBytes("fam"),
Bytes.toBytes("qualifier"),
ProductJoin.newBuilder()
.setId(key)
.setSolrJson(value)
.build().toByteArray());
return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue);
}
});
Partitioner partitioner = new IntPartitioner(2);
// repartition and sort the data - HFiles want sorted data
JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRDD =
javaPairRDD.repartitionAndSortWithinPartitions(partitioner);
Configuration baseConf = HBaseConfiguration.create();
Configuration conf = new Configuration();
conf.set(HBASE_ZOOKEEPER_QUORUM, importerParams.zkQuorum);
Job job = new Job(baseConf, "map data");
HTable table = new HTable(conf, importerParams.hbaseTargetTable);
System.out.println("gpt table: " + table.getName());
HFileOutputFormat2.configureIncrementalLoad(job, table);
System.out.println("Done configuring incremental load....");
Configuration config = job.getConfiguration();
repartitionedRDD.saveAsNewAPIHadoopFile(
"HFILE_OUTPUT_PATH",
ImmutableBytesWritable.class,
KeyValue.class,
HFileOutputFormat2.class,
config
);
System.out.println("Saved to HFILE_OUTPUT_PATH: " + HFILE_OUTPUT_PATH);
}
protected JavaRDD<Row> readJsonTable() {
System.out.println("In readJsonTable.....");
SparkSession.Builder builder = SparkSession.builder().appName("Importer");
String hiveTable = "";
if (importerParams.local) {
builder.master("local");
hiveTable = HIVE_TABLE_S3A_DEV_SAMPLE;
} else {
hiveTable = importerParams.hiveSourceTable;
}
SparkSession spark = builder.getOrCreate();
SparkContext sparkContext = spark.sparkContext();
// this is important. need to set the endpoint to ap-northeast-2
sparkContext.hadoopConfiguration()
.set("fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com");
Dataset<Row> rows = null;
if (importerParams.local) {
rows = spark.read().format("orc").load(hiveTable);
} else {
rows = spark.read().format("orc").load(hiveTable);//use this one temporarily
// rows = spark.read().format("orc").load(HIVE_TABLE_S3A_PREFIX
// + importerParams.latestDateHour);
}
System.out.println("Finished loading hive table from S3, rows.count() = "
+ (rows != null ? rows.count() : 0));
return rows.toJavaRDD();
}
主程序:
long startTime = System.currentTimeMillis();
JavaRDD<Row> rdd = readJsonTable();
sparkGenerateHFiles(rdd);
System.out.println("it took " + (System.currentTimeMillis() - startTime)/1000 + " seconds to generate HFiles...\n\n\n\n");
我尝试过的:
我在 Whosebug 上看到了最接近的 post。
然后我设置了这个
builder.config("spark.shuffle.blockTransferService", "nio");
但仍然没有运气。
非常感谢任何帮助!
正如@Wang指出的那样,这确实是由于我的数据倾斜问题。
为了解决这个问题,我做的是:
我重新创建了我的 HBase table,但是这次,我使用了 SPLITS
,并将我的 HBase table 拆分为 80 个区域。
然后在我的 Spark 代码中,我写了一个自定义的 Partitioner 来根据每个条目的键重新分区,这样就不会出现 HOTSPOTTING
问题,即一个区域服务器过载而其他区域服务器闲置。
在使用 SPLITS
创建 HBase table 时,沿途学到了一些其他技巧,默认情况下,第一个区域的 startkey
和 endkey
最后一个区域的 是空字符串 ""
,一定要在那里做正确的事情以避免 HOTSPOTTING。
这是我的 partitioner.
的一个工作示例
谢谢!
我编写了一个 Spark 作业来读取 S3 中的 Hive 数据并生成 HFile。
这项工作在只读取一个ORC文件(大约190MB)时工作正常,但是,当我用它读取整个S3目录时,大约400个ORC文件,所以大约400*190MB = 76GB数据,它一直在 error/stacktrace:
之后抛出这个17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39149; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/06/12 01:48:03 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-211-127-63.ap-northeast-2.compute.internal, executor 9): java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
17/06/12 01:48:03 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 0.0 (TID 541, ip-10-211-126-250.ap-northeast-2.compute.internal, executor 72, partition 6, PROCESS_LOCAL, 6680 bytes)
17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39151; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
我的集群足够大来处理它:(这已经被验证)
它有 40 个节点,超过 800 GB 可用内存,320 个 VCore。
这是我的 Java 代码:
protected void sparkGenerateHFiles(JavaRDD<Row> rdd) throws IOException {
System.out.println("In sparkGenerateHFiles....");
JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD = rdd.mapToPair(
new PairFunction<Row, ImmutableBytesWritable, KeyValue>() {
public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception {
System.out.println("running call now ....");
String key = (String) row.get(0);
String value = (String) row.get(1);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
byte[] rowKeyBytes = Bytes.toBytes(key);
rowKey.set(rowKeyBytes);
KeyValue keyValue = new KeyValue(rowKeyBytes,
Bytes.toBytes("fam"),
Bytes.toBytes("qualifier"),
ProductJoin.newBuilder()
.setId(key)
.setSolrJson(value)
.build().toByteArray());
return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue);
}
});
Partitioner partitioner = new IntPartitioner(2);
// repartition and sort the data - HFiles want sorted data
JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRDD =
javaPairRDD.repartitionAndSortWithinPartitions(partitioner);
Configuration baseConf = HBaseConfiguration.create();
Configuration conf = new Configuration();
conf.set(HBASE_ZOOKEEPER_QUORUM, importerParams.zkQuorum);
Job job = new Job(baseConf, "map data");
HTable table = new HTable(conf, importerParams.hbaseTargetTable);
System.out.println("gpt table: " + table.getName());
HFileOutputFormat2.configureIncrementalLoad(job, table);
System.out.println("Done configuring incremental load....");
Configuration config = job.getConfiguration();
repartitionedRDD.saveAsNewAPIHadoopFile(
"HFILE_OUTPUT_PATH",
ImmutableBytesWritable.class,
KeyValue.class,
HFileOutputFormat2.class,
config
);
System.out.println("Saved to HFILE_OUTPUT_PATH: " + HFILE_OUTPUT_PATH);
}
protected JavaRDD<Row> readJsonTable() {
System.out.println("In readJsonTable.....");
SparkSession.Builder builder = SparkSession.builder().appName("Importer");
String hiveTable = "";
if (importerParams.local) {
builder.master("local");
hiveTable = HIVE_TABLE_S3A_DEV_SAMPLE;
} else {
hiveTable = importerParams.hiveSourceTable;
}
SparkSession spark = builder.getOrCreate();
SparkContext sparkContext = spark.sparkContext();
// this is important. need to set the endpoint to ap-northeast-2
sparkContext.hadoopConfiguration()
.set("fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com");
Dataset<Row> rows = null;
if (importerParams.local) {
rows = spark.read().format("orc").load(hiveTable);
} else {
rows = spark.read().format("orc").load(hiveTable);//use this one temporarily
// rows = spark.read().format("orc").load(HIVE_TABLE_S3A_PREFIX
// + importerParams.latestDateHour);
}
System.out.println("Finished loading hive table from S3, rows.count() = "
+ (rows != null ? rows.count() : 0));
return rows.toJavaRDD();
}
主程序:
long startTime = System.currentTimeMillis();
JavaRDD<Row> rdd = readJsonTable();
sparkGenerateHFiles(rdd);
System.out.println("it took " + (System.currentTimeMillis() - startTime)/1000 + " seconds to generate HFiles...\n\n\n\n");
我尝试过的:
我在 Whosebug 上看到了最接近的 post。
然后我设置了这个
builder.config("spark.shuffle.blockTransferService", "nio");
但仍然没有运气。
非常感谢任何帮助!
正如@Wang指出的那样,这确实是由于我的数据倾斜问题。
为了解决这个问题,我做的是:
我重新创建了我的 HBase table,但是这次,我使用了 SPLITS
,并将我的 HBase table 拆分为 80 个区域。
然后在我的 Spark 代码中,我写了一个自定义的 Partitioner 来根据每个条目的键重新分区,这样就不会出现 HOTSPOTTING
问题,即一个区域服务器过载而其他区域服务器闲置。
在使用 SPLITS
创建 HBase table 时,沿途学到了一些其他技巧,默认情况下,第一个区域的 startkey
和 endkey
最后一个区域的 是空字符串 ""
,一定要在那里做正确的事情以避免 HOTSPOTTING。
这是我的 partitioner.
的一个工作示例谢谢!