Неожиданные результаты при создании диктов и списков СДР в pyspark

Ниже простой pyspark скрипт, который пытается разбить СДР на словарь, содержащий несколько СДР.

Как показывает пример запуска, скрипт работает, только если мы collect() на промежуточных СДР по мере их создания. Конечно, я не хотел бы делать это на практике, так как это не масштабируется.

Что действительно странно, я не назначаю промежуточный collect() результаты к любой переменной. Таким образом, разница в поведении обусловлена ​​исключительно скрытым побочным эффектом вычислений, вызванных collect() вызов.

Предполагается, что Spark - очень функциональная структура с минимальными побочными эффектами. Почему возможно получить желаемое поведение, вызвав какой-то загадочный побочный эффект, используя collect()?

Ниже приведен прогон Spark 1.5.2, Python 2.7.10 и IPython 4.0.0.

spark_script.py

from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
    d = dict()
    for key_value in key_values:
        d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
        if collect_in_loop:
            d[key_value].collect()
    return d
def print_results(d):
    for k in d:
        print k
        pp(d[k].collect())    

rdd = sc.parallelize([
    {'color':'red','size':3},
    {'color':'red', 'size':7},
    {'color':'red', 'size':8},    
    {'color':'red', 'size':10},
    {'color':'green', 'size':9},
    {'color':'green', 'size':5},
    {'color':'green', 'size':50},    
    {'color':'blue', 'size':4},
    {'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']

print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)

Пример запуска в оболочке IPython

In [1]: execfile('spark_script.py')
### run WITH collect in loop: 
blue
[{   'color': 'blue', 'size': 4}]
purple
[{   'color': 'purple', 'size': 6}]
green
[   {   'color': 'green', 'size': 9},
    {   'color': 'green', 'size': 5},
    {   'color': 'green', 'size': 50}]
red
[   {   'color': 'red', 'size': 3},
    {   'color': 'red', 'size': 7},
    {   'color': 'red', 'size': 8},
    {   'color': 'red', 'size': 10}]
### run WITHOUT collect in loop: 
blue
[{   'color': 'purple', 'size': 6}]
purple
[{   'color': 'purple', 'size': 6}]
green
[{   'color': 'purple', 'size': 6}]
red
[{   'color': 'purple', 'size': 6}]

1 ответ

Решение

Короткий ответ

Как оказалось, это не столько проблема Spark, сколько хитрая функция Python, называемая замыканиями с поздним связыванием. Быстрый способ заставить раннее связывание (желаемое поведение в этом случае) состоит в добавлении аргумента по умолчанию:

lambda row, key_value=key_value: row[key_field] == key_value

Другой способ - с functools.partial.

Длинный ответ

Когда в Python определена функция, любые параметры, поступающие извне функции, извлекаются из определяющей среды (лексическая область видимости), и это делается при оценке функции, а не при ее определении (позднее связывание). Итак, в лямбда-функции, используемой преобразованием фильтра, значение key_value не определяется, пока функция не будет оценена.

Вы можете начать видеть опасность здесь: key_value принимает несколько значений в цикле split_RDDs_by_key(), Что если, когда lambda оценивается, key_value больше не имеет значение, которое мы хотели? Функции часто оцениваются задолго до того, как они определены, особенно при работе с СДР. Из-за ленивой вычислительной семантики СДР лямбда не будет оцениваться до тех пор, пока не будет вызвано действие для извлечения данных, такое как collect() или же take(),

В split_RDD_by_key() мы переходим key_values и создание нового RDD для каждого значения. когда collect_in_loop=False, здесь нет collect() до после split_RDD_by_key() выполнено К тому времени цикл внутри завершен, и key_value теперь имеет значение 'purple' из последней итерации цикла. Когда все лямбды во всех RDD от split_RDD_by_key() оцениваются, все они установлены key_value на "фиолетовый" и получить "фиолетовые" ряды СДР.

когда collect_in_loop=Trueмы делаем collect() на каждой итерации, в результате чего лямбда будет оцениваться в той же итерации, где она была определена, и мы получим key_value мы ожидаем.

Этот пример на самом деле раскрывает интересную, тонкую деталь о замыканиях Python. Когда в цикле collect() запускает оценку лямбды, лямбда связывает значение. Но что делает лямбда при оценке позже? collect() заявления, когда key_value изменился (в определяющей среде) по сравнению с тем, что было при первой лямбда-оценке? Этот пример показывает, что все оценки закрытия функции основаны на привязке из первой оценки. "Вызов означает закрытие, раз и навсегда".

Другие вопросы по тегам