PySpark + Google Cloud Storage (весь текстовый файл)

Я пытаюсь проанализировать около 1 миллиона HTML-файлов с помощью PySpark (Google Dataproc) и записать соответствующие поля в сжатый файл. Каждый файл HTML составляет около 200 КБ. Следовательно, все данные о 200GB.

Приведенный ниже код работает нормально, если я использую подмножество данных, но работает в течение нескольких часов, а затем вылетает при работе на всем наборе данных. Кроме того, рабочие узлы не используются (<5% ЦП), поэтому я знаю, что есть некоторые проблемы.

Я считаю, что система задыхается от приема данных из GCS. Есть лучший способ сделать это? Кроме того, когда я так использую wholeTextFiles, мастер пытается загрузить все файлы и затем отправить их исполнителям, или это позволяет исполнителям загрузить их?

def my_func(keyval):
   keyval = (file_name, file_str)
   return parser(file_str).__dict__

data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")

2 ответа

Решение

Чтобы ответить на ваш вопрос, мастер не будет читать все содержащиеся в нем данные, но он получит статус для всех входных файлов перед началом работы. Dataproc по умолчанию устанавливает для свойства "mapreduce.input.fileinputformat.list-status.num-threads" значение 20, чтобы помочь сократить время этого поиска, но RPC по-прежнему выполняется для каждого файла в GCS.

Кажется, вы нашли случай, когда даже добавление потоков не очень помогает, а просто приводит драйвер к OOM быстрее.

Разбираясь в том, как распараллелить чтение, у меня есть две идеи.

Но сначала небольшое предупреждение: ни одно из этих решений, поскольку они являются очень надежными, к каталогам, включенным в глобус. Возможно, вы захотите защититься от каталогов, появляющихся в списке файлов для чтения.

Первый выполняется с помощью Python и инструментов командной строки hadoop (это также можно сделать с помощью gsutil). Ниже приведен пример того, как он может выглядеть и выполнять листинг файлов на рабочих, считывает содержимое файла в пары и, наконец, вычисляет пары (имя файла, длина файла):

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext

import sys
import subprocess


def hadoop_ls(file_glob):
  lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
  files = [line.split()[7] for line in lines if len(line) > 0]
  return files

def hadoop_cat(file):
  return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  # This is just for testing. You'll want to generate a list 
  # of prefix globs instead of having a list passed in from the 
  # command line.
  globs = sys.argv[1:]
  # Desired listing partition count
  lpc = 100
  # Desired 'cat' partition count, should be less than total number of files
  cpc = 1000
  files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
  files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
  files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

Сначала я хотел бы начать с этого решения подпроцесса и поиграть с разделением вызовов hadoop_ls и hadoop_cat и посмотреть, сможете ли вы получить что-то приемлемое.

Второе решение является более сложным, но, вероятно, даст более производительный конвейер, избегая многих, многих вызовов exec.

Во втором решении мы собираем специальный jar-помощник, используя действие инициализации, чтобы скопировать этот jar всем работникам, и, наконец, используем помощник из нашего драйвера.

Окончательная структура каталогов нашего проекта jar scala будет выглядеть примерно так:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt

В нашем файле PysparkHelper.scala у нас будет небольшой класс scala, который функционирует так же, как и наше решение на чистом Python выше. Сначала мы создадим СДР файловых глобусов, затем СДР с именами файлов и, наконец, СДР из пар файлов и их содержимого.

package com.google.cloud.dataproc.support

import collection.JavaConversions._

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}

import java.util.ArrayList
import java.nio.charset.StandardCharsets

class PysparkHelper extends Serializable {
  def wholeTextFiles(
    context: JavaSparkContext,
    paths: ArrayList[String],
    partitions: Int): JavaPairRDD[String, String] = {

    val globRDD = context.sc.parallelize(paths).repartition(partitions)
    // map globs to file names:
    val filenameRDD = globRDD.flatMap(glob => {
      val path = new Path(glob)
      val fs: FileSystem = path.getFileSystem(new Configuration)
      val statuses = fs.globStatus(path)
      statuses.map(s => s.getPath.toString)
    })
    // Map file name to (name, content) pairs:
    // TODO: Consider adding a second parititon count parameter to repartition before
    // the below map.
    val fileNameContentRDD = filenameRDD.map(f => {
      Pair(f, readPath(f, new Configuration))
    })

    new JavaPairRDD(fileNameContentRDD)
  }

  def readPath(file: String, conf: Configuration) = {
    val path = new Path(file)
    val fs: FileSystem = path.getFileSystem(conf)
    val stream = fs.open(path)
    try {
      IOUtils.toString(stream, StandardCharsets.UTF_8)
    } finally {
      stream.close()
    }
  }
}

Файл helper/build.sbt будет выглядеть примерно так:

organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies +=  "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true

Затем мы можем собрать помощника с помощью sbt:

$ cd helper && sbt package

Выходной вспомогательный файл jar должен быть target/scala-2.10/pyspark_support_2.10-0.1.jar

Теперь нам нужно поместить этот jar в наш кластер, и для этого нам нужно сделать две вещи: 1) загрузить jar в GCS и 2) создать действие инициализации в GCS, чтобы скопировать jar в узлы кластера.

В целях иллюстрации, давайте предположим, что ваш контейнер называется MY_BUCKET (вставьте соответствующий мем, связанный с моржом, здесь).

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar

Создайте действие инициализации (назовем его pyspark_init_action.sh, заменив MY_BUCKET при необходимости):

#!/bin/bash

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/

и, наконец, загрузите действие инициализации в GCS:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh

Теперь кластер можно запустить, передав следующие флаги gcloud:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh

После сборки, загрузки и установки нашей новой библиотеки мы, наконец, можем использовать ее из pyspark:

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer

import sys

class DataprocUtils(object):

  @staticmethod
  def wholeTextFiles(sc, glob_list, partitions):
    """
    Read whole text file content from GCS.
    :param sc: Spark context
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset.
    :param partitions: number of partitions to use when creating the RDD
    :return: RDD of filename, filecontent pairs.
    """
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
               PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  globs = sys.argv[1:]
  partitions = 10
  files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
  files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

Спасибо! Я попробовал первый способ. Это работает, но не очень эффективно из-за вызовов exec и издержек RPC/auth. Работа в кластере из 32 узлов занимает около 10 часов. Я смог запустить его за 30 минут на кластере из 4 узлов, используя блоки данных на aws с разъемом Amazon s3. Кажется, там намного меньше накладных расходов. Я хотел бы, чтобы Google предоставил лучший способ получения данных из GCS в Spark.

Другие вопросы по тегам