Почему увеличивается число процессов, когда многопоточность с PySpark портит работу реального кода?

У меня есть отдельные файлы частей avro (с различными типами схем), которые я должен циклически просматривать, читать и хранить, чтобы я мог выполнять другие операции и в конечном итоге объединять их.

Я использую пакет многопроцессорной обработки Python для вызова ThreadPool и применения своей пользовательской функции к итерируемому пути к файлу. С небольшим количеством потоков код работает нормально. Но поскольку я продолжаю увеличивать количество рабочих процессов, хотя код выполняется быстрее, выходные значения являются неточными и все испорчено.

Это мой сценарий вкратце:

Файл A: количество строк - 119158 Файл B: количество строк - 247487

Образец кода:

#filepath_list = list of the two filepaths

def my_custom_function(k):
    quotes_df = sqlc.read.format("com.databricks.spark.avro").load(k)
    quotes_df.registerTempTable("quotes_df")
    quotes_each = sqlc.sql("Select column_1, column_2 from quotes_df")
    return quotes_each.count()

tpool = ThreadPool(np)
results = tpool.map(my_custom_function, filepath_list)
tpool.close()
tpool.join()

Когда np = 1, print(results) возвращает (119158, 247487). Но при np > 1 print (results) дает (119158, 119158) или (247487, 247487) случайным образом.

Примечание. По мере того, как список путей к файлам продолжает расти, "порог ошибки" для np также увеличивается. Например, с 50 путями к файлам код работает нормально при np = 4, но начинает портиться при np > 4.

Есть ли у кого-нибудь мысли о том, почему это должно происходить и какие могут быть возможные решения?

Благодарю.

0 ответов

Другие вопросы по тегам