Как загрузить файл CSV в векторы Arache Arrow и сохранить файл стрелки на диск

В настоящее время я играю с API Java Apache Arrow (хотя я использую его из Scala для примеров кода), чтобы немного познакомиться с этим инструментом.

В качестве упражнения я решил загрузить файл CSV в векторы стрелок, а затем сохранить их в файл стрелок. Первая часть казалась достаточно легкой, и я попробовал это так:

val csvLines: Stream[Array[String]] = <open stream from CSV parser>

// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)

// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)

// Work on the data, zipping it with its index 
Stream.from(0)
  .zip(csvLines.tail) // Work on the tail (head contains the headers)
  .foreach(rowTup =>  // rowTup = (index, csvRow as an Array[String])
    Range(0, rowTup._2.size) // Iterate on each column...
      .foreach(columnNumber =>
        writeToMutator(
          mutators(columnNumber), // get that column's mutator
          idx=rowTup._1,          // pass the current row number
          data=rowTup._2(columnNumber) // pass the entry of the curernt column
        )
      )
)

С initVectors() а также writeToMutator() определяется как:

def initVectors(
  columns: Array[String], 
  alloc: RootAllocator): Array[NullableVarCharVector] = {

  // Initialize a vector for each column
  val vectors = columns.map(colName => 
    new NullableVarCharVector(colName, alloc))
  // 4096 size, for 1024 values initially. This is arbitrary
  vectors.foreach(_.allocateNew(2^12,1024))
  vectors
}

def writeToMutator(
  mutator: NullableVarCharVector#Mutator, 
  idx: Int, 
  data: String): Unit = {

  // The CSV may contain null values
  if (data != null) {
    val bytes = data.getBytes()
    mutator.setSafe(idx, bytes, 0, bytes.length)
  }
  mutator.setNull(idx)
}

(В настоящее время я не забочусь об использовании правильного типа и сохраняю все как строки, или VarChar в крачках

Так что на данный момент у меня есть коллекция NullableVarCharVector и может читать и писать из / в них. Все замечательно на этом этапе. Теперь, для следующего шага, мне было интересно узнать, как на самом деле обернуть их и сериализовать в файл стрелок. Я наткнулся на AbstractFieldWriter абстрактный класс, но как использовать реализации неясно.

Итак, вопрос в основном таков:

  • Каков (лучший? - кажется, несколько) способ сохранить несколько векторов в файл стрелки.
  • Есть ли другие способы загрузки столбцов CSV в векторы стрелок?

отредактировано, чтобы добавить: страница описания метаданных обеспечивает хороший общий обзор по этой теме.

Тестовые классы API, кажется, содержат несколько вещей, которые могут помочь. Я опубликую ответ с примером, как только попробую.

1 ответ

Решение

Глядя на TestArrowFile.java и BaseFileTest.java, я нашел:

  • Как записать файл с одной стрелкой на диск
  • Альтернативный способ заполнения векторов, поскольку моя первая попытка помешала мне собрать один файл стрелки (или, по крайней мере, сделать это простым способом).

Итак, заполнение векторов теперь выглядит так:

// Open stream of rows 
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)

// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName => 
                                  rootWriter.varChar(colName))

Stream.from(0)
  .zip(csvLines.tail) // Zip the rows with their index
  .foreach( rowTup => { // Iterate on each (index, row) tuple
    val (idx, row) = rowTup
      Range(0, row.size) // Iterate on each field of the row
        .foreach(column =>
          Option(row(column)) // row(column) may be null,
            .foreach(str =>   // use the option as a null check
              write(writers(column), idx, allocator, str)
            )
      )
  }
)

toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file

с write определяется как:

def write(writer: VarCharWriter, idx: Int, 
  allocator: BufferAllocator, data: String): Unit = {
  // Set the position to the correct index
  writer.setPosition(idx)
  val bytes = data.getBytes()
  // Apparently the allocator is required again to build a new buffer
  val varchar = allocator.buffer(bytes.length)
  varchar.setBytes(0, data.getBytes())
  writer.writeVarChar(0, bytes.length, varchar)
}

def toFile(parent: FieldVector, fName: String): Unit = {
  // Extract a schema from the parent: that's the part I struggled with in the original question
  val rootSchema = new VectorSchemaRoot(parent)
  val stream = new FileOutputStream(fName)
  val fileWriter = new ArrowFileWriter(
                        rootSchema,
                        null, // We don't use dictionary encoding.
                        stream.getChannel)
  // Write everything to file...
  fileWriter.start()
  fileWriter.writeBatch()
  fileWriter.end()
  stream.close()
}

С вышеупомянутым я могу сохранить CSV в файл. Я проверил, что все прошло хорошо, прочитав его и снова преобразовав в CSV, и содержимое не изменилось.

Обратите внимание, что ComplexWriterImpl позволяет писать столбцы разных типов, что пригодится, чтобы избежать хранения числовых столбцов в виде строк.

(Сейчас я играю со стороной, связанной с чтением, эти вещи, вероятно, заслуживают своих собственных вопросов.)

Другие вопросы по тегам