Как применить функцию к каждой строке в SparkR?
У меня есть файл в формате CSV, который содержит таблицу со столбцами "id", "timestamp", "action", "value" и "location". Я хочу применить функцию к каждой строке таблицы, и я уже написал код на R следующим образом:
user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
row <- user[i,]
if(user$action != "power")
curLocation <- row$value
user[i,"location"] <- curLocation
}
Скрипт R работает отлично, и теперь я хочу применить его SparkR. Однако я не смог получить доступ к i-й строке непосредственно в SparkR, и я не смог найти ни одной функции для манипулирования каждой строкой в документации SparkR.
Какой метод я должен использовать для достижения того же эффекта, что и в сценарии R?
Кроме того, по совету @chateaur, я попытался написать код с помощью функции dapply следующим образом:
curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
if(row$Action != "power|battery|level"){
curLoc <- row$Value
}
row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)
Я посмотрел предупреждающее сообщение, что условие имеет длину> 1, и будет использоваться только первый элемент, и я нашел что-то /questions/26973608/interpretatsiya-preduprezhdeniya-uslovie-imeet-dlinu-1-iz-funktsii-if/26973623#26973623. Это заставило меня задуматься, представляют ли параметр строки в функции dapply весь раздел моего фрейма данных, а не одну строку? Может быть, функция dapply не является желательным решением?
Позже я попытался изменить функцию в соответствии с рекомендациями @chateaur. Вместо использования dapply я использовал dapplyCollect, что избавляет меня от необходимости указывать схему. Оно работает!
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- "1"
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row$action != "power") {
curLocation <- row$value
}
partitionnedDf[i,"location"] <- curLocation
}
partitionnedDf
}
bw <- dapplyCollect(user, changeLocation)
1 ответ
Scorpion775,
Вы должны поделиться своим кодом sparkR. Не забывайте, что данные не обрабатываются одинаково в R и sparkR.
От: http://spark.apache.org/docs/latest/sparkr.html,
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
Тогда вы можете посмотреть на функцию dapply здесь: https://spark.apache.org/docs/2.1.0/api/R/dapply.html
Вот рабочий пример:
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- as.integer(1)
# Loop over each row of the partitionned data frame
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row[1] != "power") {
curLocation <- row[2]
}
partitionnedDf[i,3] <- curLocation
}
# Return modified data frame
partitionnedDf
}
# Load data
df <- read.df("data.csv", "csv", header="false", inferSchema = "true")
head(collect(df))
# Define schema of dataframe
schema <- structType(structField("action", "string"), structField("value", "integer"),
structField("location", "integer"))
# Change location of each row
df2 <- dapply(df, changeLocation, schema)
head(df2)