如何为 mrJobs 中的多行输入编写自定义协议
how to write a custom protocol for multiple line input into mrJobs
我正在尝试将 mrJobs 与 csv 文件一起使用。问题是 csv 文件的输入跨越多行。
通过 mrJob 文档搜索,我认为我需要编写一个自定义协议来处理输入。
我尝试在下面编写自己的协议,multiLineCsvInputProtocol
,但我已经遇到错误:TypeError: a bytes-like object is required, not 'str'
我不会说谎,我想我已经无法理解了。
基本上,多行 csv 文件中的每一行新数据都以日期字符串开头。我想逐行读取输入,在逗号上吐出每一行,将值存储在列表中,每当新行以日期字符串开头时,我想 yield
整个列表到第一个映射器。
(或者找到其他更好的方法来读取多行 csv 输入)
谁能帮我解决这个错误?
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol
class multiLineCsvInputProtocol(object):
def read(self, line):
key, val = enumerate(line.split(',', 1))
return key, val
class someTask(MRJob):
INPUT_PROTOCOL = multiLineCsvInputProtocol
def mapper1(self,_, row):
yield (row, 1 )
if __name__ == '__main__':
MRFindReciprocal.run()
根据mrjob
的documentation,read函数的line参数有bytestring 的类型,您很可能会收到该错误,因为您 split-ting by ','
这是一个 str:
Writing custom protocols
A protocol is an object with methods read(self, line) and write(self,
key, value). The read() method takes a bytestring and returns a
2-tuple of decoded objects, and write() takes the key and value and
returns bytes to be passed back to Hadoop Streaming or as output.
可能的解决方案:
- 您可以尝试按
b','
拆分,这是一个 bytestring
- 你可以在分割前解码行,像这样:
line.decode().split(',', 1)
(指定编码可能是个好主意)
我正在尝试将 mrJobs 与 csv 文件一起使用。问题是 csv 文件的输入跨越多行。
通过 mrJob 文档搜索,我认为我需要编写一个自定义协议来处理输入。
我尝试在下面编写自己的协议,multiLineCsvInputProtocol
,但我已经遇到错误:TypeError: a bytes-like object is required, not 'str'
我不会说谎,我想我已经无法理解了。
基本上,多行 csv 文件中的每一行新数据都以日期字符串开头。我想逐行读取输入,在逗号上吐出每一行,将值存储在列表中,每当新行以日期字符串开头时,我想 yield
整个列表到第一个映射器。
(或者找到其他更好的方法来读取多行 csv 输入)
谁能帮我解决这个错误?
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol
class multiLineCsvInputProtocol(object):
def read(self, line):
key, val = enumerate(line.split(',', 1))
return key, val
class someTask(MRJob):
INPUT_PROTOCOL = multiLineCsvInputProtocol
def mapper1(self,_, row):
yield (row, 1 )
if __name__ == '__main__':
MRFindReciprocal.run()
根据mrjob
的documentation,read函数的line参数有bytestring 的类型,您很可能会收到该错误,因为您 split-ting by ','
这是一个 str:
Writing custom protocols
A protocol is an object with methods read(self, line) and write(self, key, value). The read() method takes a bytestring and returns a 2-tuple of decoded objects, and write() takes the key and value and returns bytes to be passed back to Hadoop Streaming or as output.
可能的解决方案:
- 您可以尝试按
b','
拆分,这是一个 bytestring - 你可以在分割前解码行,像这样:
line.decode().split(',', 1)
(指定编码可能是个好主意)