Как выполнить преобразование агрегации в скрипте kiba etl (gem kiba)?

Я хочу написать сценарий Kiba Etl, который имеет источник из CSV в CSV-адресат со списком правил преобразования, среди которых 2-й преобразователь является агрегацией, в которой выполняется операция выбора имени, суммы (евро) по имени

Kiba ETL Script file

source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol

transform VerifyFieldsPresence, [:name, :euro]

transform AggregateFields, { sum: :euro, group_by: :name}

transform RenameField,from: :euro, to: :total_amount

destination CsvDestination, 'result.csv', [:name, :total_amount]

users.csv

date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack

result.csv (ожидаемый результат)

total_amount;name
16;Jack
97;Jill
99;Mack

Поскольку преобразователи etl выполняются один за другим в одной строке за раз, но мое поведение второго преобразователя зависит от всей коллекции строк, к которой я не могу получить доступ к ней в классе, который передается методу преобразования.

transform AggregateFields, { sum: :euro, group_by: :name }

Возможно ли, что такое поведение может быть достигнуто с помощью Kiba Gem
Заранее спасибо

1 ответ

Решение

Автор киба тут! Вы можете достичь этого разными способами, в основном в зависимости от размера данных и ваших реальных потребностей. Вот несколько возможностей.

Агрегирование с использованием переменной в вашем скрипте Kiba

require 'awesome_print'

transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

total_amounts = Hash.new(0)

transform do |r|
  total_amounts[r[:name]] += r[:amount]
  r
end

post_process do
  # pretty print here, but you could save to a CSV too
  ap total_amounts
end

Это самый простой способ, но он достаточно гибкий.

Это сохранит ваши агрегаты в памяти, так что это может быть достаточно хорошим или нет, в зависимости от вашего сценария. Обратите внимание, что в настоящее время Kiba является однопоточным (но "Kiba Pro" будет многопоточным), поэтому на данный момент нет необходимости добавлять блокировку или использовать потоковую структуру для агрегата.

Вызов TextQL из блоков post_process

Другой быстрый и простой способ агрегирования - сначала создать неагрегированный CSV-файл, а затем использовать TextQl для фактического агрегирования, например:

destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]

post_process do
  query = <<SQL
    select
      name,
      /* apparently sqlite has reduced precision, round to 2 for now */
      round(sum(amount), 2) as total_amount
    from tbl group by name
SQL

  textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end

Определены следующие помощники:

def system!(cmd)
  raise "Failed to run command #{command}" unless system(command)
end

def textql(source_file, query, output_file)
  system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
  # this one uses csvfix to pretty print the table
  system! "cat #{output_file} | csvfix ascii_table"
end

Будьте осторожны с точностью при выполнении вычислений.

Запись места назначения агрегирования в памяти

Полезный трюк, который может сработать, заключается в том, чтобы обернуть заданный пункт назначения в класс для выполнения агрегации. Вот как это может выглядеть:

class InMemoryAggregate
  def initialize(sum:, group_by:, destination:)
    @aggregate = Hash.new(0)
    @sum = sum
    @group_by = group_by
    # this relies a bit on the internals of Kiba, but not too much
    @destination = destination.shift.new(*destination)
  end

  def write(row)
    # do not write, but count here instead
    @aggregate[row[@group_by]] += row[@sum]
  end

  def close
    # use close to actually do the writing
    @aggregate.each do |k,v|
      # reformat BigDecimal additions here
      value = '%0.2f' % v
      @destination.write(@group_by => k, @sum => value)
    end
    @destination.close
  end
end

который вы можете использовать следующим образом:

# convert your string into an actual number
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

destination CsvDestination, 'non-aggregated.csv', [:name, :amount]

destination InMemoryAggregate,
  sum: :amount, group_by: :name,
  destination: [
    CsvDestination, 'aggregated.csv', [:name, :amount]
  ]

post_process do
  system!("cat aggregated.csv | csvfix ascii_table")
end

Приятно то, что в этой версии вы можете повторно использовать агрегатор для разных целей (например, для базы данных или для чего-то еще).

Обратите внимание, что это сохранит все агрегаты в памяти, как и первая версия.

Вставка в магазин с возможностями агрегирования

Другой способ (особенно полезный, если у вас очень большие объемы) состоит в том, чтобы отправить полученные данные во что-то, что сможет объединить данные для вас. Это может быть обычная база данных SQL, Redis или что-то более необычное, к которому вы сможете обращаться по мере необходимости.

Как я уже сказал, реализация будет в значительной степени зависеть от ваших реальных потребностей. Надеюсь, вы найдете то, что работает для вас здесь!

Другие вопросы по тегам