Как мне прочитать большой CSV-файл с помощью класса Scala Stream?
Как мне прочитать большой CSV-файл (> 1 Гб) с помощью Scala Stream? У вас есть пример кода? Или вы бы использовали другой способ для чтения большого CSV-файла без предварительной загрузки его в память?
3 ответа
Просто используйте Source.fromFile(...).getLines
как вы уже заявили.
Это возвращает Iterator, который уже ленив (вы бы использовали поток в качестве ленивой коллекции, где вы хотели, чтобы ранее полученные значения были запомнены, чтобы вы могли прочитать их снова)
Если у вас проблемы с памятью, то проблема будет в том, что вы делаете после getLines. Любая операция, как toList
, что вызывает строгий сбор, вызовет проблему.
Я надеюсь, что вы не имеете в виду Скала collection.immutable.Stream
с потоком. Это не то, что вы хотите. Поток ленив, но делает запоминание.
Я не знаю, что вы планируете делать, но простое чтение файла построчно должно работать очень хорошо, без использования большого количества памяти.
getLines
следует оценивать лениво и не вылетать (если в вашем файле не более 2? 2 строк, афаик). Если это так, спросите на #scala или подайте заявку на ошибку (или сделайте оба).
Если вы хотите обрабатывать большой файл построчно, не требуя загрузки всего содержимого файла сразу в память, вы можете использовать Iterator
вернулся scala.io.Source
,
У меня есть небольшая функция, tryProcessSource
, (содержащий две подфункции), которые я использую именно для этих типов сценариев использования. Функция принимает до четырех параметров, из которых требуется только первый. Другие параметры имеют нормальные значения по умолчанию.
Вот профиль функции (полная реализация функции внизу):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
Первый параметр, file: File
, необходимо. И это просто любой действительный экземпляр java.io.File
который указывает на строчно-ориентированный текстовый файл, например CSV.
Второй параметр, parseLine: (Int, String) => Option[List[String]]
не является обязательным. И если это предусмотрено, это должна быть функция, ожидающая получения двух входных параметров; index: Int
, unparsedLine: String
, А потом вернуть Option[List[String]]
, Функция может вернуть Some
завернутый List[String]
состоящий из допустимых значений столбца. Или это может вернуть None
что указывает на то, что весь процесс потоковой передачи прерывается на ранней стадии. Если этот параметр не указан, значением по умолчанию является (index, line) => Some(List(line))
предоставлен. Это значение по умолчанию приводит к тому, что вся строка возвращается как одна String
значение.
Третий параметр, filterLine: (Int, List[String]) => Option[Boolean]
не является обязательным. И если это предусмотрено, это должна быть функция, ожидающая получения двух входных параметров; index: Int
, parsedValues: List[String]
, А потом вернуть Option[Boolean]
, Функция может вернуть Some
завернутый Boolean
указывает, должна ли эта конкретная строка быть включена в вывод. Или это может вернуть None
что указывает на то, что весь процесс потоковой передачи прерывается на ранней стадии. Если этот параметр не указан, значением по умолчанию является (index, values) => Some(true)
предоставлен. Это значение по умолчанию приводит к включению всех строк.
Четвертый и последний параметр, retainValues: (Int, List[String]) => Option[List[String]]
не является обязательным. И если это предусмотрено, это должна быть функция, ожидающая получения двух входных параметров; index: Int
, parsedValues: List[String]
, А потом вернуть Option[List[String]]
, Функция может вернуть Some
завернутый List[String]
состоящий из некоторого подмножества и / или изменения существующих значений столбца. Или это может вернуть None
что указывает на то, что весь процесс потоковой передачи прерывается на ранней стадии. Если этот параметр не указан, значением по умолчанию является (index, values) => Some(values)
предоставлен. Это значение по умолчанию приводит к значениям, проанализированным вторым параметром, parseLine
,
Рассмотрим файл со следующим содержимым (4 строки):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
Следующий профиль вызова...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
... результаты в этом выводе для tryLinesDefaults
(неизменное содержимое файла):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
Следующий профиль вызова...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
... результаты в этом выводе для tryLinesParseOnly
(каждая строка разбирается в значения отдельных столбцов):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
Следующий профиль вызова...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
... результаты в этом выводе для tryLinesIrvingTxNoHeader
(каждая строка анализируется в значениях отдельных столбцов, без заголовка и только в двух строках в Irving,Tx):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
Вот весь tryProcessSource
реализация функции:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
Хотя это решение является относительно лаконичным, мне потребовалось немало времени и много раз на рефакторинг, прежде чем я наконец смог добраться сюда. Пожалуйста, дайте мне знать, если вы видите, как это можно улучшить.
ОБНОВЛЕНИЕ: я только что задал вопрос ниже, поскольку это собственный вопрос Stackru. И теперь у него есть ответ, исправляющий ошибку, упомянутую ниже.
У меня была идея попробовать сделать это еще более общим, изменив retainValues
параметр для transformLine
с новым определением обобщенной функции ниже. Тем не менее, я продолжаю получать ошибку выделения в IntelliJ "Выражение типа Some[List[String]] не соответствует ожидаемому типу Option[A]" и не смог выяснить, как изменить значение по умолчанию, чтобы ошибка уходит.
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
Любая помощь о том, как сделать эту работу будет принята с благодарностью.