使用 Hadoop Streaming 和 MapReduce 处理来自 CommonCrawl 的许多 WARC 档案

Processing many WARC archives from CommonCrawl using Hadoop Streaming and MapReduce

我正在开展一个项目,我需要从 S3 容器下载特定 URL 的爬网数据(从 CommonCrawl),然后处理该数据。

目前我有一个 MapReduce 作业(Python 通过 Hadoop Streaming),它获取 URL 列表的正确 S3 文件路径。然后我尝试使用第二个 MapReduce 作业通过从 commoncrawl S3 存储桶下载数据来处理此输出。在映射器中,我使用 boto3 从 commoncrawl S3 存储桶下载特定 URL 的 gzip 内容,然后输出有关 gzip 内容的一些信息(单词计数器信息、内容长度、URLs 链接等)。 reducer 然后通过这个输出得到最终的字数,URL 列表等

第一个 MapReduce 作业的输出文件大小只有 6mb(但一旦我们扩展到完整数据集,将会更大)。当我运行第二个MapReduce时,这个文件只被分割了两次。通常这对于这么小的文件来说不是问题,但是我上面描述的映射器代码(获取 S3 数据,吐出映射输出等)每个 URL 需要一段时间 运行。由于文件仅拆分两次,因此只有 2 个映射器 运行。我需要增加拆分的数量,以便更快地完成映射。

我已尝试为 MapReduce 作业设置 "mapreduce.input.fileinputformat.split.maxsize" 和 "mapreduce.input.fileinputformat.split.minsize",但它不会改变发生的拆分次数。

这是映射器的一些代码:

s3 = boto3.client('s3', 'us-west-2', config=Config(signature_version=UNSIGNED))
offset_end = offset + length - 1

gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
        'Body'].read()

fileobj = io.BytesIO(gz_file)

with gzip.open(fileobj, 'rb') as file:
    [do stuff]

我还手动将输入文件拆分为多个文件,最多 100 行。这具有给我更多映射器的预期效果,但随后我开始遇到来自 s3client.get_object() 调用的 ConnectionError:

Traceback (most recent call last):
  File "dmapper.py", line 103, in <module>
    commoncrawl_reader(base_url, full_url, offset, length, warc_file)
  File "dmapper.py", line 14, in commoncrawl_reader
    gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
  File "/usr/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/client.py", line 599, in _make_api_call
    operation_model, request_dict)
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 148, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 177, in _send_request
    success_response, exception):
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 273, in _needs_retry
    caught_exception=caught_exception, request_dict=request_dict)
  File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 227, in emit
    return self._emit(event_name, kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 210, in _emit
    response = handler(**kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 183, in __call__
    if self._checker(attempts, response, caught_exception):
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 251, in __call__
    caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 277, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 317, in __call__
    caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 223, in __call__
    attempt_number, caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
    raise caught_exception
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 222, in _get_response
    proxies=self.proxies, timeout=self.timeout)
  File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
    r = adapter.send(request, **kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py", line 415, in send
    raise ConnectionError(err, request=request)
botocore.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

我目前 运行 只用少数 URL 来解决这个问题,但是一旦我开始工作,我将需要用几千个(每个都有许多子目录)来完成它。

我不确定从哪里开始解决这个问题。我觉得很可能有比我正在尝试的方法更好的方法。映射器似乎为每个 URL 花费这么长时间这一事实似乎表明我正在接近这个错误。我还应该提到映射器和缩减器都 运行 正确如果 运行 直接作为管道命令:

"cat short_url_list.txt | python mapper.py | sort | python reducer.py" -> 生成所需的输出,但在 URL 的整个列表中 运行 花费的时间太长。

任何指导将不胜感激。

MapReduce API 提供 NLineInputFormat. The property "mapreduce.input.lineinputformat.linespermap" allows to control how many lines (here WARC records) are passed to a mapper at maximum. It works with mrjob, cf. Ilya's WARC indexer.

关于S3连接错误:最好运行数据所在的us-east-1 AWS区域的作业