ETL в CSV-файлы, разделить, а затем подтолкнуть к s3, чтобы быть использованными в красном смещении

Просто начав с Кибы, я не нашел ничего очевидного, но я мог бы просто направить своего внутреннего ребенка (который ищет свои туфли, глядя в потолок).

Я хочу сбросить очень большой стол в Amazon Redshift. Кажется, что самый быстрый способ сделать это - записать кучу CSV-файлов в корзину S3, а затем сообщить Redshift (через COPY команда), чтобы вытащить их. Волшебное масштабирование гремлинов сделает все остальное.

Итак, я думаю, что я хочу, чтобы Kiba писал CSV-файл для каждых 10 тыс. Строк данных, затем передавал его в s3, а затем начинал запись в новый файл. В конце сделайте вызов постобработки COPY

Итак, могу ли я "конвейерно" выполнить работу или это должен быть большой вложенный класс Destination?

т.е.

source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process

3 ответа

Решение

Тибо, я сделал что-то подобное, за исключением того, что я перенаправил его в Tempfile, я думаю...

require 'csv'

# @param limit [Integer, 1_000] Number of rows per csv file
# @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
#        each csv file is finished
module PacerPro
  class CSVDestination
    def initialize(limit: 1_000, callback: ->(obj) { })
      @limit = limit
      @callback = callback

      @csv = nil
      @row_count = 0
    end

    # @param row [Hash] returned from transforms
    def write(row)
      csv << row.values
      @row_count += 1
      return if row_count < limit

      self.close
    end

    # Called by Kiba when the transform pipeline is finished
    def close
      csv.close

      callback.call(csv)

      tempfile.unlink

      @csv = nil
      @row_count = 0
    end

    private

    attr_reader :limit, :callback
    attr_reader :row_count, :tempfile

    def csv
      @csv ||= begin
        @tempfile = Tempfile.new('csv')
        CSV.open(@tempfile, 'w')
      end
    end
  end
end

Автор Кибы здесь. Спасибо за попытку!

В настоящее время лучший способ реализовать это - создать то, что я бы назвал "местом назначения буферизации". (Версия этого, скорее всего, в какой-то момент закончится в Kiba Common).

(Пожалуйста, тщательно проверьте, я только что написал, что этим утром для вас, не запускал его вообще, хотя в прошлом я использовал менее универсальные версии. Также имейте в виду, что эта версия использует буфер в памяти для ваших 10k строк, поэтому увеличение числа до чего-то гораздо большего потребляет память. Однако можно также создать версию с наименьшим объемом памяти, которая будет записывать строки в файл по мере их получения)

class BufferingDestination
  def initialize(buffer_size:, on_flush:)
    @buffer = []
    @buffer_size
    @on_flush = on_flush
    @batch_index = 0
  end

  def write(row)
    @buffer << row
    flush if @buffer.size >= buffer_size
  end

  def flush
    on_flush.call(batch_index: @batch_index, rows: @buffer)
    @batch_index += 1
    @buffer.clear
  end

  def close
    flush
  end
end

Это то, что вы можете затем использовать следующим образом, например, здесь повторно использовать пункт назначения Kiba Common CSV (хотя вы также можете написать свой собственный):

require 'kiba-common/destinations/csv'

destination BufferingDestination,
  buffer_size: 10_000,
  on_flush: -> { |batch_index, rows|
    filename = File.join("output-#{sprintf("%08d", batch_index)}")
    csv = Kiba::Common::Destinations::CSV.new(
      filename: filename,
      csv_options: { ... },
      headers: %w(my fields here)
    )
    rows.each { |r| csv.write(r) }
    csv.close
  }

Затем вы можете вызвать COPY прямо в on_flush заблокировать после создания файла (если вы хотите, чтобы загрузка началась сразу же), или в post_process block (но это начнется только после того, как все CSV будут готовы, что может быть функцией, обеспечивающей некоторую форму транзакционной глобальной загрузки, если вы предпочитаете).

Вы могли бы начать фантазировать и запустить очередь потоков, чтобы фактически обрабатывать загрузку параллельно, если вам это действительно нужно (но тогда будьте осторожны с потоками зомби и т. Д.).

Другой способ состоит в том, чтобы иметь ETL-процессы "с несколькими шагами", с одним сценарием для генерации CSV и другим, выбирающим их для загрузки, запущенными одновременно (это то, что я объяснил в моем выступлении, например, в RubyKaigi 2018).

Дайте мне знать, как все работает для вас!

Я не уверен, что здесь точный вопрос. Но я думаю, что ваше решение кажется правильным, но несколько предложений, хотя.

  1. Вы можете рассмотреть вопрос о том, чтобы иметь более 10K записей на файл CSV и gzip их при отправке на S3.
  2. Ты хочешь увидеть menifest создание, содержащее список нескольких файлов, а затем запустить copy подача команд menifest файл в качестве ввода.
Другие вопросы по тегам