Обработка многих архивов WARC из CommonCrawl с использованием потоковой передачи Hadoop и MapReduce
Я работаю над проектом, в котором мне нужно загрузить данные обхода (из CommonCrawl) для конкретных URL-адресов из контейнера S3, а затем обработать эти данные.
В настоящее время у меня есть задание MapReduce (Python через Hadoop Streaming), которое получает правильные пути к файлам S3 для списка URL-адресов. Затем я пытаюсь использовать второе задание MapReduce для обработки этого вывода путем загрузки данных из корзины S3 commoncrawl. В маппере я использую boto3, чтобы загрузить содержимое gzip для определенного URL-адреса из корзины commoncrawl S3, а затем вывести некоторую информацию о содержимом gzip (информацию о счетчике слов, длину содержимого, URL-адреса, связанные и т. Д.). Затем редуктор проходит через этот вывод, чтобы получить окончательное количество слов, список URL и т. Д.
Размер выходного файла из первого задания MapReduce составляет всего около 6 МБ (но он будет больше, когда мы масштабируемся до полного набора данных). Когда я запускаю второй MapReduce, этот файл разделяется только дважды. Обычно это не проблема для такого маленького файла, но код сопоставления, который я описал выше (выборка данных S3, выделение отображенного вывода и т. Д.), Требует времени для запуска для каждого URL. Поскольку файл разбивается только дважды, выполняется только 2 сопоставления. Мне нужно увеличить количество разбиений, чтобы сопоставление можно было выполнить быстрее.
Я попытался установить "mapreduce.input.fileinputformat.split.maxsize" и "mapreduce.input.fileinputformat.split.minsize" для задания MapReduce, но это не меняет количество выполняемых разбиений.
Вот часть кода из картографа:
s3 = boto3.client('s3', 'us-west-2', config=Config(signature_version=UNSIGNED))
offset_end = offset + length - 1
gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
'Body'].read()
fileobj = io.BytesIO(gz_file)
with gzip.open(fileobj, 'rb') as file:
[do stuff]
Я также вручную разбил входной файл на несколько файлов с максимум 100 строками. Это дало желаемый эффект, дав мне больше картографов, но затем я столкнулся с ошибкой ConnectionError из вызова s3client.get_object():
Traceback (most recent call last):
File "dmapper.py", line 103, in <module>
commoncrawl_reader(base_url, full_url, offset, length, warc_file)
File "dmapper.py", line 14, in commoncrawl_reader
gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
File "/usr/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/lib/python3.6/site-packages/botocore/client.py", line 599, in _make_api_call
operation_model, request_dict)
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 148, in make_request
return self._send_request(request_dict, operation_model)
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 177, in _send_request
success_response, exception):
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 273, in _needs_retry
caught_exception=caught_exception, request_dict=request_dict)
File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 227, in emit
return self._emit(event_name, kwargs)
File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 210, in _emit
response = handler(**kwargs)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 183, in __call__
if self._checker(attempts, response, caught_exception):
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 251, in __call__
caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 277, in _should_retry
return self._checker(attempt_number, response, caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 317, in __call__
caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 223, in __call__
attempt_number, caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
raise caught_exception
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 222, in _get_response
proxies=self.proxies, timeout=self.timeout)
File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
r = adapter.send(request, **kwargs)
File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py", line 415, in send
raise ConnectionError(err, request=request)
botocore.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
В настоящее время я запускаю это только с несколькими URL-адресами, но мне нужно будет сделать это с несколькими тысячами (каждый со многими подкаталогами), как только я заработаю.
Я не уверен, с чего начать с исправления этого. Я чувствую, что весьма вероятно, что есть лучший подход, чем то, что я пытаюсь. Тот факт, что маппер, кажется, занимает много времени для каждого URL-адреса, кажется большим свидетельством того, что я подхожу к этому неправильно. Следует также отметить, что преобразователь и преобразователь работают правильно, если они запускаются напрямую как команда канала:
"cat short_url_list.txt | python mapper.py | sort | python reducer.py" -> Производит желаемый вывод, но может занять слишком много времени для запуска по всему списку URL.
Любое руководство будет с благодарностью.
1 ответ
API MapReduce предоставляет NLineInputFormat. Свойство "mapreduce.input.lineinputformat.linespermap" позволяет контролировать, сколько строк (в данном случае записей WARC) максимально передается в преобразователь. Работает с mrjob, ср. Илья в WARC индексатор.
Что касается ошибки соединения S3: лучше выполнить задание в регионе AWS us-east-1, где расположены данные.