如何在 HDFS hadoop Map-Reduce 中处理增量更新

How to handle Incremental Update in HDFS hadoop Map-Reduce

我在 HDF 中有结构化的基本文本文件,其中包含这样的数据(在 file.txt 中):

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|

file.txt 的大小是 30 GB。

我有增量数据 file1.txt,大小约为 2 GB,在 HFDS 中以相同的格式出现,如下所示:

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|

现在我必须合并 file.txt 和 file1.txt 并创建一个包含所有唯一记录的最终文本文件。

两个文件中的key都是OrgId。如果在第一个文件中找到相同的 OrgId,那么我必须用新的 OrgId 替换,如果没有,那么我必须插入新的 OrgId。

最终输出是这样的。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|

我如何在 mapreduce 中做到这一点?

我不打算使用 HIVE 解决方案,因为我有很多这样的不同文件,大约 10.000 个,所以我必须在 HIVE 中创建 10.000 个分区。

对此用例使用 Spark 有什么建议吗?

我建议您在 scala 中为 spark 编程。如果您在 mapreduce 中编程,它将仅对 hadoop 有用,但在 scala 中为 spark 编程将使您能够在 sparkhadoopSpark 是为了解决 mapreduce 模型中的缺点而发起的。您可以找到有关此主题的许多资源。其中之一是 this

关于你的问题,我建议你使用dataframe

第一个任务是为数据帧创建 schema

val schema = StructType(Array(StructField("OgId", StringType),
  StructField("ItemId", StringType),
  StructField("segmentId", StringType),
  StructField("Sequence", StringType),
  StructField("Action", StringType)))

下一个任务是读取这两个文件并使用上述模式创建数据框

import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

df1的输出是

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136   |4        |1       |I     |
|4295877346|136   |4        |1       |I     |
|4295877341|138   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877341|145   |14       |1       |I     |
+----------+------+---------+--------+------+

df2 的输出是

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

现在根据您的要求,如果 OgIddf2 匹配,您希望从 df1 中删除 rows,并将所有 df2 附加到 df1。这些要求可以按如下方式完成

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")

df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)

最终输出为

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136   |4        |1       |I     |
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

这个最终结果可以在hdfs中保存为

df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")

希望对您有所帮助

注意:确保输入和输出位置的路径正确