Spark группировка, а затем сортировка (код Java)

У меня есть JavaPairRDD и мне нужно сгруппировать по ключу, а затем отсортировать его, используя значение внутри объекта MyObject.

Допустим, MyObject это:

class MyObject {
    Integer order;
    String name;
}

Пример данных:

1, {order:1, name:'Joseph'}
1, {order:2, name:'Tom'}
1, {order:3, name:'Luke'}
2, {order:1, name:'Alfred'}
2, {order:3, name:'Ana'}
2, {order:2, name:'Jessica'}
3, {order:3, name:'Will'}
3, {order:2, name:'Mariah'}
3, {order:1, name:'Monika'}

Ожидаемый результат:

Раздел 1:

1, {order:1, name:'Joseph'}
1, {order:2, name:'Tom'}
1, {order:3, name:'Luke'}

Раздел 2

2, {order:1, name:'Alfred'}
2, {order:2, name:'Jessica'}
2, {order:3, name:'Ana'}

Раздел 3:

3, {order:1, name:'Monika'}
3, {order:2, name:'Mariah'}
3, {order:3, name:'Will'}

Я использую ключ для разделения RDD, а затем использую MyObject.order для сортировки данных внутри раздела.

Моя цель состоит в том, чтобы получить только k-первые элементы в каждом отсортированном разделе, а затем уменьшить их до значения, рассчитанного другим атрибутом MyObject (AKA "первый N лучший из группы").

Как я могу это сделать?

1 ответ

Решение

Ты можешь использовать mapPartitions:

JavaPairRDD<Long, MyObject> sortedRDD = rdd.groupBy(/* the first number */)
    .mapPartitionsToPair(x -> {
        List<Tuple2<Long, MyObject>> values = toArrayList(x);
        Collections.sort(values, (x, y) -> x._2.order - y._2.order);

        return values.iterator();
     }, true);

Два основных момента:

  • toArrayList принимает Iterator и возвращает ArrayList. Вы должны реализовать это самостоятельно
  • важно иметь значение true в качестве второго аргумента mapPartitionsToPair, потому что оно сохранит разбиение
Другие вопросы по тегам