Масштабирование: как сохранить другое поле после groupBy('field){. Size}?

Поэтому мои входные данные имеют два поля / столбца: id1 и id2, и мой код выглядит следующим образом:

TextLine(args("input"))
.read
.mapTo('line->('id1,'id2)) {line: String =>
    val fields = line.split("\t")
        (fields(0),fields(1))
}
.groupBy('id2){.size}
.write(Tsv(args("output")))

В результате получается (что я предполагаю) два поля: id2 * size. Я немного застрял при выяснении, возможно ли сохранить значение id1, которое также было сгруппировано с id2, и добавить его в качестве другого поля?

1 ответ

Решение

Боюсь, ты не можешь сделать это хорошим способом. Подумайте о том, как это работает под капотом - он разбивает данные, которые нужно подсчитать, на куски и отправляет их различным процессам, каждый процесс считает свой кусок, а затем один редуктор добавляет их все в конце. Пока каждый процесс считает, он не знает весь размер, поэтому он не может добавить поле. Единственный способ - вернуться назад и добавить его к данным, как только будет известен весь размер (т. Е. Соединение).

Если каждая группа помещается в память (и вы можете настроить память), вы можете:

Tsv(args("input"), ('id1, 'id2))
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list))
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) {
  case (list, size) => list.map(record => (record._1, record._2, size))
}
.write(Tsv(args("output")))

Но если вашей системе не хватает памяти, вам придется использовать дорогостоящее соединение.

Примечание: вы можете использовать Tsv вместо TextLine, затем mapTo и расщепление.

Другие вопросы по тегам