如何在 HDFS hadoop Map-Reduce 中处理增量更新
How to handle Incremental Update in HDFS hadoop Map-Reduce
我在 HDF 中有结构化的基本文本文件,其中包含这样的数据(在 file.txt 中):
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|
file.txt 的大小是 30 GB。
我有增量数据 file1.txt,大小约为 2 GB,在 HFDS 中以相同的格式出现,如下所示:
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|
现在我必须合并 file.txt 和 file1.txt 并创建一个包含所有唯一记录的最终文本文件。
两个文件中的key都是OrgId。如果在第一个文件中找到相同的 OrgId,那么我必须用新的 OrgId 替换,如果没有,那么我必须插入新的 OrgId。
最终输出是这样的。
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
我如何在 mapreduce 中做到这一点?
我不打算使用 HIVE 解决方案,因为我有很多这样的不同文件,大约 10.000 个,所以我必须在 HIVE 中创建 10.000 个分区。
对此用例使用 Spark 有什么建议吗?
我建议您在 scala
中为 spark
编程。如果您在 mapreduce
中编程,它将仅对 hadoop
有用,但在 scala
中为 spark
编程将使您能够在 spark
和 hadoop
。 Spark
是为了解决 mapreduce
模型中的缺点而发起的。您可以找到有关此主题的许多资源。其中之一是 this
关于你的问题,我建议你使用dataframe
第一个任务是为数据帧创建 schema
。
val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
下一个任务是读取这两个文件并使用上述模式创建数据框
import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
df1
的输出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136 |4 |1 |I |
|4295877346|136 |4 |1 |I |
|4295877341|138 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877341|145 |14 |1 |I |
+----------+------+---------+--------+------+
而 df2
的输出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
现在根据您的要求,如果 OgId
与 df2
匹配,您希望从 df1
中删除 rows
,并将所有 df2
附加到 df1
。这些要求可以按如下方式完成
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)
最终输出为
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136 |4 |1 |I |
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
这个最终结果可以在hdfs
中保存为
df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")
希望对您有所帮助
注意:确保输入和输出位置的路径正确
我在 HDF 中有结构化的基本文本文件,其中包含这样的数据(在 file.txt 中):
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|
file.txt 的大小是 30 GB。
我有增量数据 file1.txt,大小约为 2 GB,在 HFDS 中以相同的格式出现,如下所示:
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|
现在我必须合并 file.txt 和 file1.txt 并创建一个包含所有唯一记录的最终文本文件。
两个文件中的key都是OrgId。如果在第一个文件中找到相同的 OrgId,那么我必须用新的 OrgId 替换,如果没有,那么我必须插入新的 OrgId。
最终输出是这样的。
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
我如何在 mapreduce 中做到这一点?
我不打算使用 HIVE 解决方案,因为我有很多这样的不同文件,大约 10.000 个,所以我必须在 HIVE 中创建 10.000 个分区。
对此用例使用 Spark 有什么建议吗?
我建议您在 scala
中为 spark
编程。如果您在 mapreduce
中编程,它将仅对 hadoop
有用,但在 scala
中为 spark
编程将使您能够在 spark
和 hadoop
。 Spark
是为了解决 mapreduce
模型中的缺点而发起的。您可以找到有关此主题的许多资源。其中之一是 this
关于你的问题,我建议你使用dataframe
第一个任务是为数据帧创建 schema
。
val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
下一个任务是读取这两个文件并使用上述模式创建数据框
import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
df1
的输出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136 |4 |1 |I |
|4295877346|136 |4 |1 |I |
|4295877341|138 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877341|145 |14 |1 |I |
+----------+------+---------+--------+------+
而 df2
的输出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
现在根据您的要求,如果 OgId
与 df2
匹配,您希望从 df1
中删除 rows
,并将所有 df2
附加到 df1
。这些要求可以按如下方式完成
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)
最终输出为
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136 |4 |1 |I |
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
这个最终结果可以在hdfs
中保存为
df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")
希望对您有所帮助
注意:确保输入和输出位置的路径正确