Spark RDD:根据文本文件格式分区
Spark RDD: partitioning according to text file format
我有一个包含数十 GB 数据的文本文件,我需要从 HDFS 加载它并将其并行化为 RDD。此文本文件使用以下格式描述项目。请注意,字母字符串不存在(每行的含义是隐含的)并且每行可以包含空格以分隔不同的值:
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
0220 (weight)
3030 (amount)
我认为并行化此文件的最直接方法是将其从本地文件系统上传到 HDFS,然后通过执行 sc.textFile(filepath)
创建一个 RDD。但是,在这种情况下,分区将取决于与文件对应的 HDFS 拆分。
上述方法的问题是每个分区都可能包含不完整的项目。例如:
分区 1
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
分区 2
0220 (weight)
3030 (amount)
因此,当我们为每个分区调用一个方法并将其相应的数据块传递给它时,它将收到一个不完整的标识为 0002 的项目的规范。这将导致内部执行的计算输出错误被调用的方法。
为了避免这个问题,对这个 RDD 进行分区或重新分区的最有效方法是什么?能否指定每个分区的行数为4的倍数?如果是,应该用Hadoop还是Spark来做?
为什么不在将文件放入 HDFS 之前简单地对行进行分组以避免此问题?
xargs -L4 echo < file
hdfs dfs -put file /your/path
您的数据将如下所示
0001 1000 0100 0030
0002 1110 0220 3030
如果这样做,您可以使用更优化的 Spark DataFrames API 读取数据
比 RDD 更丰富 API 和性能来编写您的应用程序。
加载文本文件得到RDD[String]
然后使用zipWithIndex转换为RDD[(String, Long)]
其中元组中的第二个属性是元素在RDD中的索引号。
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
- 使用索引作为行号(从 0 开始)我们可以将属于一条记录的行分组。例如。
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
- 因为我们知道每条记录跨越(恰好)4 行,索引除以 4 的整数除法(让我们称之为 idx_div)。这将导致前四行具有 0 作为
idx_div
,接下来的四行将获得 1 作为 idx_div
等等。例如。 [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, ...
。这可用于将属于一条记录的所有(四)行分组,以便进一步解析和处理
case class Record(id:String, dimensions:String, weight:String, amount:String)
val lines = sc.textFile("...")
val records = lines
.zipWithIndex
.groupBy(line_with_idx => (line_with_idx._2 / 4)) // groupBy idx_div
.map(grouped_record => {
val (idx_div:Long, lines_with_idx:Iterable[(String, Long)]) = grouped_record
val lines_with_idx_list = lines_with_idx.toList.sortBy(_._2) // Additional check to ensure ordering
val lines_list = lines_with_idx_list.map(_._1)
val List(id:String, dimensions:String, weight:String, amount:String) = lines_list
new Record(id, dimensions, weight, amount)
})
我有一个包含数十 GB 数据的文本文件,我需要从 HDFS 加载它并将其并行化为 RDD。此文本文件使用以下格式描述项目。请注意,字母字符串不存在(每行的含义是隐含的)并且每行可以包含空格以分隔不同的值:
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
0220 (weight)
3030 (amount)
我认为并行化此文件的最直接方法是将其从本地文件系统上传到 HDFS,然后通过执行 sc.textFile(filepath)
创建一个 RDD。但是,在这种情况下,分区将取决于与文件对应的 HDFS 拆分。
上述方法的问题是每个分区都可能包含不完整的项目。例如:
分区 1
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
分区 2
0220 (weight)
3030 (amount)
因此,当我们为每个分区调用一个方法并将其相应的数据块传递给它时,它将收到一个不完整的标识为 0002 的项目的规范。这将导致内部执行的计算输出错误被调用的方法。
为了避免这个问题,对这个 RDD 进行分区或重新分区的最有效方法是什么?能否指定每个分区的行数为4的倍数?如果是,应该用Hadoop还是Spark来做?
为什么不在将文件放入 HDFS 之前简单地对行进行分组以避免此问题?
xargs -L4 echo < file
hdfs dfs -put file /your/path
您的数据将如下所示
0001 1000 0100 0030
0002 1110 0220 3030
如果这样做,您可以使用更优化的 Spark DataFrames API 读取数据 比 RDD 更丰富 API 和性能来编写您的应用程序。
加载文本文件得到RDD[String]
然后使用zipWithIndex转换为RDD[(String, Long)]
其中元组中的第二个属性是元素在RDD中的索引号。
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
- 使用索引作为行号(从 0 开始)我们可以将属于一条记录的行分组。例如。
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
- 因为我们知道每条记录跨越(恰好)4 行,索引除以 4 的整数除法(让我们称之为 idx_div)。这将导致前四行具有 0 作为
idx_div
,接下来的四行将获得 1 作为idx_div
等等。例如。[0, 0, 0, 0, 1, 1, 1, 1, 2, 2, ...
。这可用于将属于一条记录的所有(四)行分组,以便进一步解析和处理
case class Record(id:String, dimensions:String, weight:String, amount:String)
val lines = sc.textFile("...")
val records = lines
.zipWithIndex
.groupBy(line_with_idx => (line_with_idx._2 / 4)) // groupBy idx_div
.map(grouped_record => {
val (idx_div:Long, lines_with_idx:Iterable[(String, Long)]) = grouped_record
val lines_with_idx_list = lines_with_idx.toList.sortBy(_._2) // Additional check to ensure ordering
val lines_list = lines_with_idx_list.map(_._1)
val List(id:String, dimensions:String, weight:String, amount:String) = lines_list
new Record(id, dimensions, weight, amount)
})