Spark и Python пытаются разобрать википедию с помощью gensim

Исходя из моего предыдущего вопроса, Spark и Python используют собственный формат файла / генератор в качестве входных данных для RDD. Я думаю, что я должен иметь возможность анализировать практически любой ввод с помощью sc.textFile(), а затем использовать мои или некоторые пользовательские функции библиотеки.

Сейчас я особенно пытаюсь разобрать дамп википедии с помощью фреймворка gensim. Я уже установил gensim на своем главном узле и на всех моих рабочих узлах, и теперь я хотел бы использовать встроенную функцию gensim для анализа страниц википедии, вдохновленных этим списком вопросов (или итератором) кортежей, возвращаемых MAP (PySpark).

Мой код следующий:

import sys
import gensim
from pyspark import SparkContext


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: wordcount <file>"
        exit(-1)

    sc = SparkContext(appName="Process wiki - distributed RDD")

    distData = sc.textFile(sys.argv[1])
    #take 10 only to see how the output would look like
    processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)

    print processed_data
    sc.stop()

Исходный код extract_pages можно найти по адресу https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py и, судя по моему опыту, он должен работать со Spark.

Но, к сожалению, когда я запускаю код, я получаю следующий журнал ошибок:

14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <ip address>.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
yield next(iterator)
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages
elems = (elem for _, elem in iterparse(f, events=("end",)))
File "<string>", line 52, in __init__
IOError: [Errno 2] No such file or directory: u'<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en">'
    org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
    org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
    org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    java.lang.Thread.run(Thread.java:745)

И тогда некоторые, вероятно, журнал Spark:

14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296

а также

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Я пробовал это без Spark успешно, поэтому проблема должна быть где-то в сочетании Spark и gensim, но я не очень понимаю ошибку, которую я получаю. Я не вижу чтения файлов в строке 190 gensim wikicorpus.py.

РЕДАКТИРОВАТЬ:

Добавлены еще несколько логов от Spark:

EDIT2:

Gensim использует от xml.etree.cElementTree import iterparse, документация здесь, которая может вызвать проблему. На самом деле он ожидает имя файла или файл, содержащий данные XML. Может ли СДР рассматриваться как файл, содержащий данные XML?

1 ответ

Я обычно работаю со Spark в Scala. Тем не менее вот мои мысли:

Когда вы загружаете файл через sc.textFile, это своего рода линейный итератор, который распределяется по вашим sparkWorkers. Я думаю, что, учитывая формат xml википедии, одна строка не обязательно соответствует анализируемому элементу xml, и, таким образом, вы получаете эту проблему.

то есть:

 Line 1 :  <item>
 Line 2 :  <title> blabla </title> <subitem>
 Line 3 : </subItem>
 Line 4 : </item>

Если вы попытаетесь разобрать каждую строку отдельно, она выдаст исключения, подобные тем, которые вы получили.

Мне обычно приходится возиться с дампом википедии, поэтому первым делом я превращаю его в "читаемую версию", которая легко переваривается Spark. То есть: одна строка на статью. Как только вы это сделаете, вы можете легко подать его в искру и выполнить все виды обработки. Не требуется много ресурсов для его преобразования

Взгляните на ReadableWiki: https://github.com/idio/wiki2vec

Другие вопросы по тегам