Как написать потоковое mapreduce для файлов Warc в Python
Я пытаюсь написать задание mapreduce для файлов warc, используя WARC библиотеку python. Следующий код работает для меня, но мне нужен этот код для заданий hadoop mapreduce.
import warc
f = warc.open("test.warc.gz")
for record in f:
print record['WARC-Target-URI'], record['Content-Length']
Я хочу, чтобы этот код читал потоковый ввод из файлов warc, т.е.
zcat test.warc.gz | warc_reader.py
Пожалуйста, скажите мне, как я могу изменить этот код для потокового ввода. Спасибо
0 ответов
warc.open()
это сокращение для warc.WARCFile()
, а также warc.WARCFile()
может получить fileobj
аргумент, где sys.stdin
это именно файловый объект. Итак, вам нужно сделать что-то вроде этого:
import sys
import warc
f = warc.open(fileobj=sys.stdin)
for record in f:
print record['WARC-Target-URI'], record['Content-Length']
Но при потоковой передаче hasoop все немного сложнее, когда ваш входной файл .gz
, поскольку hadoop заменит все \r\n
в файле WARC в \n
, что нарушит формат WARC (обратитесь к этому вопросу: hadoop преобразовывает \r\n в \ n и нарушает формат ARC). Какwarc
пакет использовать регулярное выражение "WARC/(\d+.\d+)\r\n"
для соответствия заголовкам (точное соответствие \r\n
), вы, вероятно, получите такую ошибку:
IOError: Bad version line: 'WARC/1.0\n'
Итак, вы либо измените свой PipeMapper.java
файл, как рекомендуется в упомянутом вопросе, или напишите свой собственный сценарий синтаксического анализа, который анализирует файл WARC построчно.
Кстати, просто изменив warc.py
использовать \n
вместо того \r\n
в сопоставлении заголовков не будет работать, потому что он читает контент точно так же, как длина Content-Length
, и после этого ожидаются две пустые строки. Следовательно, то, что делает hadoop, определенно приведет к несоответствию длины содержимого атрибутуContent-Length
поэтому вызовите другую ошибку, например:
IOError: Expected '\n', found 'abc\n'