Spark и Python используют пользовательский формат файла / генератор в качестве входных данных для RDD

Я хотел бы спросить о возможностях ввода в Spark. Из http://spark.apache.org/docs/latest/programming-guide.html я вижу, что я могу использовать sc.textFile() для чтения текстовых файлов в RDD, но я хотел бы выполнить некоторую предварительную обработку, прежде чем произойдет распространение в RDD, например, мой файл может быть в формате JSON, например. {id:123, text:"...", value:6} и я хотел бы использовать только определенные поля JSON для дальнейшей обработки.

Моя идея заключалась в том, можно ли как-нибудь использовать генератор Python в качестве входных данных для SparkContext?

Или, если в Spark есть более естественный способ обработки пользовательских, а не текстовых файлов Spark?

РЕДАКТИРОВАТЬ:

Кажется, что принятый ответ должен сработать, но он переместил меня на мой более практичный следующий вопрос: Spark и Python пытаются разобрать википедию с помощью gensim.

2 ответа

Решение

Самый быстрый способ сделать это, вероятно, загрузить текстовый файл как есть и выполнить обработку, чтобы выбрать нужные поля в результирующем СДР. Это распараллеливает эту работу по всему кластеру и будет масштабироваться более эффективно, чем любая предварительная обработка на одном компьютере.

Для JSON (или даже XML) я не думаю, что вам нужен собственный формат ввода. Поскольку PySpark выполняется в среде Python, вы можете использовать функции, регулярно доступные вам в Python, для десериализации JSON и извлечения нужных полей.

Например:

import json

raw = sc.textFile("/path/to/file.json")
deserialized = raw.map(lambda x: json.loads(x))
desired_fields = deserialized.map(lambda x: x['key1'])

desired_fields теперь СДР всех значений в key1 в оригинальном файле JSON.

Вы можете использовать этот шаблон, чтобы извлечь комбинацию полей, разделить их по пробелам или как угодно.

desired_fields = deserialized.map(lambda x: (x['key1'] + x['key2']).split(' '))

И если это становится слишком сложным, вы можете заменить lambda с обычной функцией Python, которая выполняет всю необходимую предварительную обработку и просто вызывает deserialized.map(my_preprocessing_func),

Да, вы можете создать RDD из переменной Python, используя SparkContext.parallelize():

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.count()   # 5

Эта переменная также может быть итератором.

Другие вопросы по тегам