Обработка многих архивов WARC из CommonCrawl с использованием потоковой передачи Hadoop и MapReduce

Я работаю над проектом, в котором мне нужно загрузить данные обхода (из CommonCrawl) для конкретных URL-адресов из контейнера S3, а затем обработать эти данные.

В настоящее время у меня есть задание MapReduce (Python через Hadoop Streaming), которое получает правильные пути к файлам S3 для списка URL-адресов. Затем я пытаюсь использовать второе задание MapReduce для обработки этого вывода путем загрузки данных из корзины S3 commoncrawl. В маппере я использую boto3, чтобы загрузить содержимое gzip для определенного URL-адреса из корзины commoncrawl S3, а затем вывести некоторую информацию о содержимом gzip (информацию о счетчике слов, длину содержимого, URL-адреса, связанные и т. Д.). Затем редуктор проходит через этот вывод, чтобы получить окончательное количество слов, список URL и т. Д.

Размер выходного файла из первого задания MapReduce составляет всего около 6 МБ (но он будет больше, когда мы масштабируемся до полного набора данных). Когда я запускаю второй MapReduce, этот файл разделяется только дважды. Обычно это не проблема для такого маленького файла, но код сопоставления, который я описал выше (выборка данных S3, выделение отображенного вывода и т. Д.), Требует времени для запуска для каждого URL. Поскольку файл разбивается только дважды, выполняется только 2 сопоставления. Мне нужно увеличить количество разбиений, чтобы сопоставление можно было выполнить быстрее.

Я попытался установить "mapreduce.input.fileinputformat.split.maxsize" и "mapreduce.input.fileinputformat.split.minsize" для задания MapReduce, но это не меняет количество выполняемых разбиений.

Вот часть кода из картографа:

s3 = boto3.client('s3', 'us-west-2', config=Config(signature_version=UNSIGNED))
offset_end = offset + length - 1

gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
        'Body'].read()

fileobj = io.BytesIO(gz_file)

with gzip.open(fileobj, 'rb') as file:
    [do stuff]

Я также вручную разбил входной файл на несколько файлов с максимум 100 строками. Это дало желаемый эффект, дав мне больше картографов, но затем я столкнулся с ошибкой ConnectionError из вызова s3client.get_object():

Traceback (most recent call last):
  File "dmapper.py", line 103, in <module>
    commoncrawl_reader(base_url, full_url, offset, length, warc_file)
  File "dmapper.py", line 14, in commoncrawl_reader
    gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
  File "/usr/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/client.py", line 599, in _make_api_call
    operation_model, request_dict)
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 148, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 177, in _send_request
    success_response, exception):
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 273, in _needs_retry
    caught_exception=caught_exception, request_dict=request_dict)
  File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 227, in emit
    return self._emit(event_name, kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 210, in _emit
    response = handler(**kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 183, in __call__
    if self._checker(attempts, response, caught_exception):
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 251, in __call__
    caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 277, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 317, in __call__
    caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 223, in __call__
    attempt_number, caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
    raise caught_exception
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 222, in _get_response
    proxies=self.proxies, timeout=self.timeout)
  File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
    r = adapter.send(request, **kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py", line 415, in send
    raise ConnectionError(err, request=request)
botocore.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

В настоящее время я запускаю это только с несколькими URL-адресами, но мне нужно будет сделать это с несколькими тысячами (каждый со многими подкаталогами), как только я заработаю.

Я не уверен, с чего начать с исправления этого. Я чувствую, что весьма вероятно, что есть лучший подход, чем то, что я пытаюсь. Тот факт, что маппер, кажется, занимает много времени для каждого URL-адреса, кажется большим свидетельством того, что я подхожу к этому неправильно. Следует также отметить, что преобразователь и преобразователь работают правильно, если они запускаются напрямую как команда канала:

"cat short_url_list.txt | python mapper.py | sort | python reducer.py" -> Производит желаемый вывод, но может занять слишком много времени для запуска по всему списку URL.

Любое руководство будет с благодарностью.

1 ответ

Решение

API MapReduce предоставляет NLineInputFormat. Свойство "mapreduce.input.lineinputformat.linespermap" позволяет контролировать, сколько строк (в данном случае записей WARC) максимально передается в преобразователь. Работает с mrjob, ср. Илья в WARC индексатор.

Что касается ошибки соединения S3: лучше выполнить задание в регионе AWS us-east-1, где расположены данные.

Другие вопросы по тегам