ETL в CSV-файлы, разделить, а затем подтолкнуть к s3, чтобы быть использованными в красном смещении
Просто начав с Кибы, я не нашел ничего очевидного, но я мог бы просто направить своего внутреннего ребенка (который ищет свои туфли, глядя в потолок).
Я хочу сбросить очень большой стол в Amazon Redshift. Кажется, что самый быстрый способ сделать это - записать кучу CSV-файлов в корзину S3, а затем сообщить Redshift (через COPY
команда), чтобы вытащить их. Волшебное масштабирование гремлинов сделает все остальное.
Итак, я думаю, что я хочу, чтобы Kiba писал CSV-файл для каждых 10 тыс. Строк данных, затем передавал его в s3, а затем начинал запись в новый файл. В конце сделайте вызов постобработки COPY
Итак, могу ли я "конвейерно" выполнить работу или это должен быть большой вложенный класс Destination?
т.е.
source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process
3 ответа
Тибо, я сделал что-то подобное, за исключением того, что я перенаправил его в Tempfile, я думаю...
require 'csv'
# @param limit [Integer, 1_000] Number of rows per csv file
# @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
# each csv file is finished
module PacerPro
class CSVDestination
def initialize(limit: 1_000, callback: ->(obj) { })
@limit = limit
@callback = callback
@csv = nil
@row_count = 0
end
# @param row [Hash] returned from transforms
def write(row)
csv << row.values
@row_count += 1
return if row_count < limit
self.close
end
# Called by Kiba when the transform pipeline is finished
def close
csv.close
callback.call(csv)
tempfile.unlink
@csv = nil
@row_count = 0
end
private
attr_reader :limit, :callback
attr_reader :row_count, :tempfile
def csv
@csv ||= begin
@tempfile = Tempfile.new('csv')
CSV.open(@tempfile, 'w')
end
end
end
end
Автор Кибы здесь. Спасибо за попытку!
В настоящее время лучший способ реализовать это - создать то, что я бы назвал "местом назначения буферизации". (Версия этого, скорее всего, в какой-то момент закончится в Kiba Common).
(Пожалуйста, тщательно проверьте, я только что написал, что этим утром для вас, не запускал его вообще, хотя в прошлом я использовал менее универсальные версии. Также имейте в виду, что эта версия использует буфер в памяти для ваших 10k строк, поэтому увеличение числа до чего-то гораздо большего потребляет память. Однако можно также создать версию с наименьшим объемом памяти, которая будет записывать строки в файл по мере их получения)
class BufferingDestination
def initialize(buffer_size:, on_flush:)
@buffer = []
@buffer_size
@on_flush = on_flush
@batch_index = 0
end
def write(row)
@buffer << row
flush if @buffer.size >= buffer_size
end
def flush
on_flush.call(batch_index: @batch_index, rows: @buffer)
@batch_index += 1
@buffer.clear
end
def close
flush
end
end
Это то, что вы можете затем использовать следующим образом, например, здесь повторно использовать пункт назначения Kiba Common CSV (хотя вы также можете написать свой собственный):
require 'kiba-common/destinations/csv'
destination BufferingDestination,
buffer_size: 10_000,
on_flush: -> { |batch_index, rows|
filename = File.join("output-#{sprintf("%08d", batch_index)}")
csv = Kiba::Common::Destinations::CSV.new(
filename: filename,
csv_options: { ... },
headers: %w(my fields here)
)
rows.each { |r| csv.write(r) }
csv.close
}
Затем вы можете вызвать COPY
прямо в on_flush
заблокировать после создания файла (если вы хотите, чтобы загрузка началась сразу же), или в post_process
block (но это начнется только после того, как все CSV будут готовы, что может быть функцией, обеспечивающей некоторую форму транзакционной глобальной загрузки, если вы предпочитаете).
Вы могли бы начать фантазировать и запустить очередь потоков, чтобы фактически обрабатывать загрузку параллельно, если вам это действительно нужно (но тогда будьте осторожны с потоками зомби и т. Д.).
Другой способ состоит в том, чтобы иметь ETL-процессы "с несколькими шагами", с одним сценарием для генерации CSV и другим, выбирающим их для загрузки, запущенными одновременно (это то, что я объяснил в моем выступлении, например, в RubyKaigi 2018).
Дайте мне знать, как все работает для вас!
Я не уверен, что здесь точный вопрос. Но я думаю, что ваше решение кажется правильным, но несколько предложений, хотя.
- Вы можете рассмотреть вопрос о том, чтобы иметь более 10K записей на файл CSV и
gzip
их при отправке на S3. - Ты хочешь увидеть
menifest
создание, содержащее список нескольких файлов, а затем запуститьcopy
подача командmenifest
файл в качестве ввода.