Параллельный спарк на итераторе с функцией

У меня есть итератор, который работает с последовательностью документов WARC и выдает измененные списки токенов для каждого документа:

class MyCorpus(object):
def __init__(self, warc_file_instance):
    self.warc_file = warc_file_instance
def clean_text(self, html):
    soup = BeautifulSoup(html) # create a new bs4 object from the html data loaded
    for script in soup(["script", "style"]): # remove all javascript and stylesheet code
        script.extract()
    # get text
    text = soup.get_text()
    # break into lines and remove leading and trailing space on each
    lines = (line.strip() for line in text.splitlines())
    # break multi-headlines into a line each
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # drop blank lines
    text = '\n'.join(chunk for chunk in chunks if chunk)
    return text
def __iter__(self):
    for r in self.warc_file:
        try:
            w_trec_id = r['WARC-TREC-ID']
            print w_trec_id
        except KeyError:
            pass
        try:
            text = self.clean_text(re.compile('Content-Length: \d+').split(r.payload)[1])
            alnum_text = re.sub('[^A-Za-z0-9 ]+', ' ', text)
            yield list(set(alnum_text.encode('utf-8').lower().split()))
        except:
            print 'An error occurred'

Теперь я применяю apache spark paraellize для дальнейшего применения желаемых функций карты:

warc_file = warc.open('/Users/akshanshgupta/Workspace/00.warc')
documents = MyCorpus(warc_file) 
x = sc.parallelize(documents, 20)
data_flat_map = x.flatMap(lambda xs: [(x, 1) for x in xs])
sorted_map = data_flat_map.sortByKey()
counts = sorted_map.reduceByKey(add)
print(counts.max(lambda x: x[1]))

У меня есть следующие сомнения:

  1. Это лучший способ для достижения этого или есть более простой способ?
  2. Когда я распараллеливаю итератор, происходит ли фактическая обработка параллельно? Это все еще последовательно?
  3. Что делать, если у меня есть несколько файлов? Как я могу масштабировать это до очень большого корпуса, скажем, туберкулеза?

1 ответ

Больше из контекста Scala, но:

  1. У меня есть одно сомнение в том, что я делаю sortByKey, а не ReduByKey.
  2. Обработка идет параллельно, если используются map, foreachPartition, Dataframe Writer и т. Д. Или чтение с помощью sc и sparksession, а парадигма Spark обычно подходит для непоследовательных зависимых алгоритмов. mapPartitions и другие API, как правило, используются для повышения производительности. Эта функция должна быть частью mapPartitions, я думаю, или использовать в сочетании с картой или в закрытии карты. Обратите внимание на сериализуемые проблемы, см.:

  3. Больше ресурсов компьютера позволяет больше масштабировать с лучшей производительностью, пропускной способностью.

Другие вопросы по тегам