Как написать потоковое mapreduce для файлов Warc в Python

Я пытаюсь написать задание mapreduce для файлов warc, используя WARC библиотеку python. Следующий код работает для меня, но мне нужен этот код для заданий hadoop mapreduce.

import warc
f = warc.open("test.warc.gz")
for record in f:
    print record['WARC-Target-URI'], record['Content-Length']

Я хочу, чтобы этот код читал потоковый ввод из файлов warc, т.е.

zcat test.warc.gz | warc_reader.py

Пожалуйста, скажите мне, как я могу изменить этот код для потокового ввода. Спасибо

0 ответов

warc.open() это сокращение для warc.WARCFile(), а также warc.WARCFile() может получить fileobj аргумент, где sys.stdinэто именно файловый объект. Итак, вам нужно сделать что-то вроде этого:

import sys
import warc

f = warc.open(fileobj=sys.stdin)
for record in f:
    print record['WARC-Target-URI'], record['Content-Length']

Но при потоковой передаче hasoop все немного сложнее, когда ваш входной файл .gz, поскольку hadoop заменит все \r\n в файле WARC в \n, что нарушит формат WARC (обратитесь к этому вопросу: hadoop преобразовывает \r\n в \ n и нарушает формат ARC). Какwarc пакет использовать регулярное выражение "WARC/(\d+.\d+)\r\n" для соответствия заголовкам (точное соответствие \r\n), вы, вероятно, получите такую ​​ошибку:

IOError: Bad version line: 'WARC/1.0\n'

Итак, вы либо измените свой PipeMapper.java файл, как рекомендуется в упомянутом вопросе, или напишите свой собственный сценарий синтаксического анализа, который анализирует файл WARC построчно.

Кстати, просто изменив warc.py использовать \n вместо того \r\n в сопоставлении заголовков не будет работать, потому что он читает контент точно так же, как длина Content-Length, и после этого ожидаются две пустые строки. Следовательно, то, что делает hadoop, определенно приведет к несоответствию длины содержимого атрибутуContent-Length поэтому вызовите другую ошибку, например:

IOError: Expected '\n', found 'abc\n'
Другие вопросы по тегам