Python 需要 3 个输入文件的 MapReduce Hadoop 流作业?
Python MapReduce Hadoop Streaming Job that requires 3 input files?
我有 3 个小样本输入文件(实际文件要大得多),
# File Name: books.txt
# File Format: BookID|Title
1|The Hunger Games
2|To Kill a Mockingbird
3|Pride and Prejudice
4|Animal Farm
# File Name: ratings.txt
# File Format: ReaderID|BookID|Rating
101|1|1
102|2|2
103|3|3
104|4|4
105|1|5
106|2|1
107|3|2
108|4|3
# File Name: readers.txt
# File Format: ReaderID|Gender|PostCode|PreferComms
101|M|1000|email
102|F|1001|mobile
103|M|1002|email
104|F|1003|mobile
105|M|1004|email
106|F|1005|mobile
107|M|1006|email
108|F|1007|mobile
我想创建一个 Python MapReduce Hadoop Streaming Job 以获得以下输出,即按标题按性别的平均评分
Animal Farm F 3.5
Pride and Prejudice M 2.5
The Hunger Games M 3
To Kill a Mockingbird F 1.5
我搜索了这个论坛,有人指出 solution 但它是针对 2 个输入文件而不是 3 个。我试了一下,但卡在了映射器部分,因为我无法对其进行排序正确以便缩减程序可以正确识别标题和性别的第一条记录,然后开始聚合。我的映射器代码如下,
#!/usr/bin/env python
import sys
for line in sys.stdin:
try:
ReaderID = "-1"
BookID = "-1"
Title = "-1"
Gender = "-1"
Rating = "-1"
line = line.strip()
splits = line.split("|")
if len(splits) == 2:
BookID = splits[0]
Title = splits[1]
elif len(splits) == 3:
ReaderID = splits[0]
BookID = splits[1]
Rating = splits[2]
else:
ReaderID = splits[0]
Gender = splits[1]
print('%s\t%s\t%s\t%s\t%s' % (BookID, Title, ReaderID, Rating, Gender))
except:
pass
PS:我只需要使用 Python 和 Hadoop Streaming。不允许安装 Python 软件包,如 Dumbo、mrjob 等
提前感谢您的帮助。
谢谢,
大堂
查看了一些核心 Java MR 并且都建议,这三个文件不能在单个地图作业中合并在一起。我们必须先连接前两个,结果应该与第三个连接。将你的逻辑应用于这三个,并没有给我很好的结果。因此,我尝试使用 Pandas,它似乎给出了有希望的结果。如果使用 pandas 对您来说不是限制,请尝试我的代码。否则,我们将尝试用 Python 字典和列表连接这三个文件。
这是我建议的代码。我刚刚连接了所有输入来测试它。在您的代码中,只需注释我的 for 循环(第 36 行)并取消注释您的 for 循环(第 35 行)。
import pandas as pd
import sys
input_string_book = [
"1|The Hunger Games",
"2|To Kill a Mockingbird",
"3|Pride and Prejudice",
"4|Animal Farm"]
input_string_book_df = pd.DataFrame(columns=('BookID','Title'))
input_string_rating = [
"101|1|1",
"102|2|2",
"103|3|3",
"104|4|4",
"105|1|5",
"106|2|1",
"107|3|2",
"108|4|3"]
input_string_rating_df = pd.DataFrame(columns=('ReaderID','BookID','Rating'))
input_string_reader = [
"101|M|1000|email",
"102|F|1001|mobile",
"103|M|1002|email",
"104|F|1003|mobile",
"105|M|1004|email",
"106|F|1005|mobile",
"107|M|1006|email",
"108|F|1007|mobile"]
input_string_reader_df = pd.DataFrame(columns=('ReaderID','Gender','PostCode','PreferComms'))
#for line in sys.stdin:
for line in input_string_book + input_string_rating + input_string_reader:
try:
line = line.strip()
splits = line.split("|")
if len(splits) == 2:
input_string_book_df = input_string_book_df.append(pd.DataFrame([[splits[0],splits[1]]],columns=('BookID','Title')))
elif len(splits) == 3:
input_string_rating_df = input_string_rating_df.append(pd.DataFrame([[splits[0],splits[1],splits[2]]],columns=('ReaderID','BookID','Rating')))
else:
input_string_reader_df = input_string_reader_df.append(pd.DataFrame([[splits[0],splits[1],splits[2],splits[3]]]
,columns=('ReaderID','Gender','PostCode','PreferComms')))
except:
raise
l_concat_1 = input_string_book_df.merge(input_string_rating_df,on='BookID',how='inner')
l_concat_2 = l_concat_1.merge(input_string_reader_df,on='ReaderID',how='inner')
for each_iter in l_concat_2[['BookID', 'Title', 'ReaderID', 'Rating', 'Gender']].iterrows():
print('%s\t%s\t%s\t%s\t%s' % (each_iter[1][0], each_iter[1][1], each_iter[1][2], each_iter[1][3], each_iter[1][4]))
输出
1 The Hunger Games 101 1 M
1 The Hunger Games 105 5 M
2 To Kill a Mockingbird 102 2 F
2 To Kill a Mockingbird 106 1 F
3 Pride and Prejudice 103 3 M
3 Pride and Prejudice 107 2 M
4 Animal Farm 104 4 F
4 Animal Farm 108 3 F
我有 3 个小样本输入文件(实际文件要大得多),
# File Name: books.txt
# File Format: BookID|Title
1|The Hunger Games
2|To Kill a Mockingbird
3|Pride and Prejudice
4|Animal Farm
# File Name: ratings.txt
# File Format: ReaderID|BookID|Rating
101|1|1
102|2|2
103|3|3
104|4|4
105|1|5
106|2|1
107|3|2
108|4|3
# File Name: readers.txt
# File Format: ReaderID|Gender|PostCode|PreferComms
101|M|1000|email
102|F|1001|mobile
103|M|1002|email
104|F|1003|mobile
105|M|1004|email
106|F|1005|mobile
107|M|1006|email
108|F|1007|mobile
我想创建一个 Python MapReduce Hadoop Streaming Job 以获得以下输出,即按标题按性别的平均评分
Animal Farm F 3.5
Pride and Prejudice M 2.5
The Hunger Games M 3
To Kill a Mockingbird F 1.5
我搜索了这个论坛,有人指出 solution 但它是针对 2 个输入文件而不是 3 个。我试了一下,但卡在了映射器部分,因为我无法对其进行排序正确以便缩减程序可以正确识别标题和性别的第一条记录,然后开始聚合。我的映射器代码如下,
#!/usr/bin/env python
import sys
for line in sys.stdin:
try:
ReaderID = "-1"
BookID = "-1"
Title = "-1"
Gender = "-1"
Rating = "-1"
line = line.strip()
splits = line.split("|")
if len(splits) == 2:
BookID = splits[0]
Title = splits[1]
elif len(splits) == 3:
ReaderID = splits[0]
BookID = splits[1]
Rating = splits[2]
else:
ReaderID = splits[0]
Gender = splits[1]
print('%s\t%s\t%s\t%s\t%s' % (BookID, Title, ReaderID, Rating, Gender))
except:
pass
PS:我只需要使用 Python 和 Hadoop Streaming。不允许安装 Python 软件包,如 Dumbo、mrjob 等
提前感谢您的帮助。
谢谢, 大堂
查看了一些核心 Java MR 并且都建议,这三个文件不能在单个地图作业中合并在一起。我们必须先连接前两个,结果应该与第三个连接。将你的逻辑应用于这三个,并没有给我很好的结果。因此,我尝试使用 Pandas,它似乎给出了有希望的结果。如果使用 pandas 对您来说不是限制,请尝试我的代码。否则,我们将尝试用 Python 字典和列表连接这三个文件。
这是我建议的代码。我刚刚连接了所有输入来测试它。在您的代码中,只需注释我的 for 循环(第 36 行)并取消注释您的 for 循环(第 35 行)。
import pandas as pd
import sys
input_string_book = [
"1|The Hunger Games",
"2|To Kill a Mockingbird",
"3|Pride and Prejudice",
"4|Animal Farm"]
input_string_book_df = pd.DataFrame(columns=('BookID','Title'))
input_string_rating = [
"101|1|1",
"102|2|2",
"103|3|3",
"104|4|4",
"105|1|5",
"106|2|1",
"107|3|2",
"108|4|3"]
input_string_rating_df = pd.DataFrame(columns=('ReaderID','BookID','Rating'))
input_string_reader = [
"101|M|1000|email",
"102|F|1001|mobile",
"103|M|1002|email",
"104|F|1003|mobile",
"105|M|1004|email",
"106|F|1005|mobile",
"107|M|1006|email",
"108|F|1007|mobile"]
input_string_reader_df = pd.DataFrame(columns=('ReaderID','Gender','PostCode','PreferComms'))
#for line in sys.stdin:
for line in input_string_book + input_string_rating + input_string_reader:
try:
line = line.strip()
splits = line.split("|")
if len(splits) == 2:
input_string_book_df = input_string_book_df.append(pd.DataFrame([[splits[0],splits[1]]],columns=('BookID','Title')))
elif len(splits) == 3:
input_string_rating_df = input_string_rating_df.append(pd.DataFrame([[splits[0],splits[1],splits[2]]],columns=('ReaderID','BookID','Rating')))
else:
input_string_reader_df = input_string_reader_df.append(pd.DataFrame([[splits[0],splits[1],splits[2],splits[3]]]
,columns=('ReaderID','Gender','PostCode','PreferComms')))
except:
raise
l_concat_1 = input_string_book_df.merge(input_string_rating_df,on='BookID',how='inner')
l_concat_2 = l_concat_1.merge(input_string_reader_df,on='ReaderID',how='inner')
for each_iter in l_concat_2[['BookID', 'Title', 'ReaderID', 'Rating', 'Gender']].iterrows():
print('%s\t%s\t%s\t%s\t%s' % (each_iter[1][0], each_iter[1][1], each_iter[1][2], each_iter[1][3], each_iter[1][4]))
输出
1 The Hunger Games 101 1 M
1 The Hunger Games 105 5 M
2 To Kill a Mockingbird 102 2 F
2 To Kill a Mockingbird 106 1 F
3 Pride and Prejudice 103 3 M
3 Pride and Prejudice 107 2 M
4 Animal Farm 104 4 F
4 Animal Farm 108 3 F