Ошибка нехватки памяти при сборе данных из кластера Spark
Я знаю, что в SO много вопросов об ошибках нехватки памяти в Spark, но я не нашел своего решения.
У меня есть простой рабочий процесс:
- читать в ORC файлы из Amazon S3
filter
до небольшого подмножества строкselect
небольшое подмножество столбцовcollect
в узел драйвера (так что я могу делать дополнительные операции вR
)
Когда я запускаю выше, а затем cache
таблица для запуска памяти занимает <2 ГБ - крошечные по сравнению с памятью, доступной для моего кластера - тогда я получаю ошибку OOM при попытке collect
данные на мой узел драйвера.
Я попытался запустить на следующих установках:
- локальный режим на компьютере с 32 ядрами и 244 ГБ оперативной памяти
- автономный режим с исполнителями 10 x 6,2 ГБ и узлом драйвера 61 ГБ
Для каждого из них я играл с многочисленными конфигурациями executor.memory
, driver.memory
, а также driver.maxResultSize
чтобы охватить весь диапазон возможных значений в моей доступной памяти, но всегда я заканчиваю с ошибкой нехватки памяти в collect
этап; или java.lang.OutOfMemoryError: Java heap space
, java.lang.OutOfMemoryError : GC overhead limit exceeded
, или же Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.
(а sparklyr
ошибка, указывающая на проблемы с памятью).
Исходя из моего [ограниченного] понимания Spark, кэширование таблицы перед сборкой должно форсировать все вычисления - т. Е. Если таблица хорошо сидит в памяти после кэширования на уровне <2 ГБ, тогда мне не нужно для сбора более 2 ГБ памяти. это в узел драйвера.
Обратите внимание, что у ответов на этот вопрос есть некоторые предложения, которые я еще не попробовал, но они, вероятно, повлияют на производительность (например, сериализацию RDD), поэтому по возможности избегайте их использования.
Мои вопросы:
- Как может случиться, что кадр данных, который занимает так мало места после его кэширования, может вызвать проблемы с памятью?
- Есть ли что-то очевидное для меня, чтобы проверить / изменить / устранить неполадки, чтобы помочь решить проблему, прежде чем перейти к дополнительным параметрам, которые могут поставить под угрозу производительность?
Спасибо
Изменить: примечание в ответ на комментарий @ Шайдо ниже, призывая cache
через Sparklyr "принудительно загружает данные в память, выполняя count(*)
над таблицей " [из документации Sparklyr] - т.е. таблица должна находиться в памяти, и все вычисления выполняются (я полагаю) до вызова collect
,
Изменить: некоторые дополнительные замечания, так как следуют предложениям ниже:
- Согласно комментариям ниже, я теперь попытался записать данные в CSV вместо сбора, чтобы получить представление о вероятном размере файла. Эта операция создает набор csvs объемом ~3 ГБ и занимает всего 2 секунды при запуске после кэширования.
- Если я установлю
driver.maxResultSize
до <1G я получаю сообщение об ошибке, утверждающее, что размер сериализованного СДР составляет 1030 МБ, больше, чем driver.maxResultSize. - Если я смотрю использование памяти в диспетчере задач после вызова
collect
Я вижу, что использование просто продолжает расти до тех пор, пока не достигнет ~ 90 ГБ, после чего происходит ошибка OOM. Так что по какой-либо причине объем оперативной памяти, используемой для выполненияcollect
операция в ~100 раз больше, чем размер СДР, который я пытаюсь собрать.
Изменить: код добавлен ниже, как и просили в комментариях.
#__________________________________________________________________________________________________________________________________
# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________
firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'
library(dplyr)
library(stringr)
library(sparklyr)
#__________________________________________________________________________________________________________________________________
# Configure & connect to spark
#__________________________________________________________________________________________________________________________________
Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop")
config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions
# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g'
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')
#__________________________________________________________________________________________________________________________________
# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________
#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++
spark_session(sc) %>%
invoke("read") %>%
invoke("format", "orc") %>%
invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>%
invoke("createOrReplaceTempView", "alldatadf")
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory
#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++
# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1
# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2)
# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7)
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)
#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++
# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory
# filter by month and year, using ORC partitions for extra speed
filter(((date_year==year_y1 & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
(date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
(date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
(date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%
# filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%
# filter by advertiser ID
filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) &
!is.na(advertiser_id)) |
((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 |
floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%
# Define cols to keep
transmute(time=as.numeric(event_time/1000000),
user_id=as.character(user_id),
action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
activity_lookup=as.character(activity_id),
sv1=as.character(segment_value_1),
other_data=as.character(other_data)) %>%
mutate(time_char=as.character(from_unixtime(time)))
# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")
#__________________________________________________________________________________________________________________________________
# Collect out of spark
#__________________________________________________________________________________________________________________________________
myDF <- collect(dftbl)
2 ответа
Когда вы говорите, что собирают на фрейме данных, происходит 2 вещи,
- Во-первых, все данные должны быть записаны в вывод на драйвер.
- Драйвер должен собирать данные со всех узлов и хранить в своей памяти.
Ответ:
Если вы хотите просто загрузить данные в память исполнителей, count() также является действием, которое загружает данные в память исполнителя, которые могут использоваться другими процессами.
Если вы хотите извлечь данные, попробуйте это вместе с другими свойствами при обработке данных "--conf spark.driver.maxResultSize=10g".
Как упоминалось выше, "кеш" не является действием, проверьте стойкость СДР:
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes.
Но "сбор" - это действие, и все вычисления (включая "кэш") будут запущены при вызове "сбор".
Вы запускаете приложение в автономном режиме, это означает, что начальная загрузка данных и все вычисления будут выполняться в одной и той же памяти.
Загрузка данных и другие вычисления используются большей частью памяти, а не "собирать".
Вы можете проверить это, заменив "собирать" на "считать".