Ошпаривание, выравнивание полей после groupBy

Я вижу это: Scalding: Как сохранить другое поле после groupBy('field){. Size}?

это настоящая боль и беспорядок по сравнению с Apache Pig... Что я делаю не так? Могу ли я сделать то же самое, что и GENERATE(FLATTEN()) свинья?

Я не совсем понимаю. Вот мой обжигающий код:

  def takeTop(topAmount: Int) :Pipe = self
    .groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)}
    .flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount))

И мой тест:

  "Take top 3" should "return most active pairs" in {
    Given{
      List( (1, 13, 7),
            (1, 13, 8),
            (1, 12, 9),
            (1, 11, 10),
            (2, 20, 21),
            (2, 20, 22)) withSchema (person1, person2, activityCount)
    } When {
      pipe:RichPipe => pipe.takeTop(3)
    } Then {
      buffer: mutable.Buffer[(Long, Long, Long)] =>
      println(buffer.toList)
      buffer.toList.size should equal(5)
      println (buffer.toList)

      buffer.toList should contain (1, 11, 10)
      buffer.toList should contain (1, 12, 9)
      buffer.toList should contain (1, 13, 8)
      buffer.toList should not contain (1, 13, 7)

      buffer.toList should contain (2, 20, 21)
      buffer.toList should contain (2, 20, 22)
  }
  }

И я получаю исключение во время выполнения:

14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
    at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
    at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321)
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3
    at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 23 more

Что я делаю не так?

UPD:

Я сделал это так:

 def takeTop(topAmount: Int) :Pipe = self
    .groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
    .flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
    .project(person1, person2, activityCount)

Тест проходит, но я не уверен, что это хороший подход...

1 ответ

Решение
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
.project(person1, person2, activityCount)

Работает, не нашел лучшего подхода

Другие вопросы по тегам