Spark и Python используют пользовательский формат файла / генератор в качестве входных данных для RDD
Я хотел бы спросить о возможностях ввода в Spark. Из http://spark.apache.org/docs/latest/programming-guide.html я вижу, что я могу использовать sc.textFile()
для чтения текстовых файлов в RDD, но я хотел бы выполнить некоторую предварительную обработку, прежде чем произойдет распространение в RDD, например, мой файл может быть в формате JSON, например. {id:123, text:"...", value:6}
и я хотел бы использовать только определенные поля JSON для дальнейшей обработки.
Моя идея заключалась в том, можно ли как-нибудь использовать генератор Python в качестве входных данных для SparkContext?
Или, если в Spark есть более естественный способ обработки пользовательских, а не текстовых файлов Spark?
РЕДАКТИРОВАТЬ:
Кажется, что принятый ответ должен сработать, но он переместил меня на мой более практичный следующий вопрос: Spark и Python пытаются разобрать википедию с помощью gensim.
2 ответа
Самый быстрый способ сделать это, вероятно, загрузить текстовый файл как есть и выполнить обработку, чтобы выбрать нужные поля в результирующем СДР. Это распараллеливает эту работу по всему кластеру и будет масштабироваться более эффективно, чем любая предварительная обработка на одном компьютере.
Для JSON (или даже XML) я не думаю, что вам нужен собственный формат ввода. Поскольку PySpark выполняется в среде Python, вы можете использовать функции, регулярно доступные вам в Python, для десериализации JSON и извлечения нужных полей.
Например:
import json
raw = sc.textFile("/path/to/file.json")
deserialized = raw.map(lambda x: json.loads(x))
desired_fields = deserialized.map(lambda x: x['key1'])
desired_fields
теперь СДР всех значений в key1
в оригинальном файле JSON.
Вы можете использовать этот шаблон, чтобы извлечь комбинацию полей, разделить их по пробелам или как угодно.
desired_fields = deserialized.map(lambda x: (x['key1'] + x['key2']).split(' '))
И если это становится слишком сложным, вы можете заменить lambda
с обычной функцией Python, которая выполняет всю необходимую предварительную обработку и просто вызывает deserialized.map(my_preprocessing_func)
,
Да, вы можете создать RDD из переменной Python, используя SparkContext.parallelize()
:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.count() # 5
Эта переменная также может быть итератором.