Как мы можем сортировать и группировать данные из Spark RDD?

Данные в файле data.csv:

07:36:00 PM 172.20.16.107   104.70.250.141  80  57188   0.48
07:33:00 PM 172.20.16.105   104.70.250.141  80  57188   0.66
07:34:00 PM 172.20.16.105   104.70.250.141  80  57188   0.47
07:35:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:44:00 PM 172.20.16.106   104.70.250.141  80  57188   0.49
07:45:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:46:00 PM 172.20.16.106   104.70.250.141  80  57188   0.33
07:47:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:48:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:36:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:37:00 PM 172.20.16.107   104.70.250.141  80  57188   0.48
07:37:00 PM 172.20.16.105   104.70.250.141  80  57188   0.66
07:38:00 PM 172.20.16.105   104.70.250.141  80  57188   0.47
07:39:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:50:00 PM 172.20.16.106   104.70.250.141  80  57188   0.49
07:51:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:52:00 PM 172.20.16.106   104.70.250.141  80  57188   0.33
07:53:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:54:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:40:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48

Это мой код:

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._

 object ScalaApp {
 def main(args: Array[String]) {
 val sc = new SparkContext("local[4]", "Program")

     // we take the raw data in CSV format and convert it into a

  val data = sc.textFile("data.csv")
 .map(line => line.split(","))

 .map(GroupRecord => (GroupRecord(0),
GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))

val numPurchases = data.count()
val d1=data.groupByKey(GroupRecord(2)) // here is the error

println("No: " + numPurchases)
println("Grouped Data" + d1)

}
}

Мне просто нужны те же данные, которые группируются по IP-адресу источника (2-й столбец) и по времени (1-й столбец). Итак, мои данные требуют:

  07:33:00 PM   172.20.16.105   104.70.250.141  80  57188   0.66
  07:34:00 PM   172.20.16.105   104.70.250.141  80  57188   0.47
  07:35:00 PM   172.20.16.105   104.70.250.141  80  57188   0.48
  07:37:00 PM   172.20.16.105   104.70.250.141  80  57188   0.66
  07:38:00 PM   172.20.16.105   104.70.250.141  80  57188   0.47
  07:39:00 PM   172.20.16.105   104.70.250.141  80  57188   0.48
  07:40:00 PM   172.20.16.105   104.70.250.141  80  57188   0.48
  07:44:00 PM   172.20.16.106   104.70.250.141  80  57188   0.49
  07:45:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:46:00 PM   172.20.16.106   104.70.250.141  80  57188   0.33
  07:47:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:50:00 PM   172.20.16.106   104.70.250.141  80  57188   0.49
  07:51:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:52:00 PM   172.20.16.106   104.70.250.141  80  57188   0.33
  07:53:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:54:00 PM   172.20.16.106   104.70.250.141  80  57188   0.48
  07:36:00 PM   172.20.16.107   104.70.250.141  80  57188   0.48
  07:37:00 PM   172.20.16.107   104.70.250.141  80  57188   0.48

но у меня есть проблемы с моим кодом, так что, пожалуйста, помогите мне!

4 ответа

Решение

Как указал Гленни, вы не создаете пару ключ-значение для groupByKey операция. Тем не менее, вы также можете использовать groupBy(_._3) чтобы получить тот же результат. Чтобы отсортировать каждую группу по первому столбцу, вы можете применить flatMapValues после группировки отсортировать элементы в каждой группе. Следующий код делает именно это:

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)

    val data = sc.textFile("data.csv")
      .map(line => line.split("\\s+"))
      .map(GroupRecord => (GroupRecord(2), (GroupRecord(0), GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5))))

    // sort the groups by the first tuple field
    val result = data.groupByKey.flatMapValues(x => x.toList.sortBy(_._1))

    // assign the partition ID to each item to see that each group is sorted
    val resultWithPartitionID = result.mapPartitionsWithIndex((id, it) => it.map(x => (id, x)))

    // print the contents of the RDD, elements of different partitions might be interleaved
    resultWithPartitionID foreach println

    val collectedResult = resultWithPartitionID.collect.sortBy(_._1).map(_._2)

    // print collected results
    println(collectedResult.mkString("\n"))
  }

Ваша проблема в том, что ваш второй map создает Tuple6 вместо пары ключ-значение, что требуется, если вы хотите выполнить операцию xxxByKey. Если вы хотите сгруппировать по 2-му столбцу, вы должны сделать GroupRecord(1) ваш ключ и остальные значения, а затем позвоните groupByKey, как это:

data
  .map(GroupRecord => (GroupRecord(1),(GroupRecord(0),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
  .groupByKey()

Вышеупомянутое решение будет работать нормально. Но когда данные увеличиваются до большого размера, я не уверен, так как вам нужно обрабатывать перестановки

Лучший способ - создать фрейм данных и использовать порядок sqlContext по IP-адресу и времени.

Здесь мы должны преобразовать его в ключ, пары значений, чтобы применить механизм GroupByKey, и после этого значения будут преобразованы в набор Iterable и применить сортировку к значениям каждого ключа, нам нужно преобразовать его в последовательность, а затем применить функцию сортировки и после этого функция flatMap сведет последовательные значения в наборы String.

Data.csv ->

07:36:00 PM 172.20.16.107   104.70.250.141  80  57188   0.48
07:33:00 PM 172.20.16.105   104.70.250.141  80  57188   0.66
07:34:00 PM 172.20.16.105   104.70.250.141  80  57188   0.47
07:35:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:44:00 PM 172.20.16.106   104.70.250.141  80  57188   0.49
07:45:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:46:00 PM 172.20.16.106   104.70.250.141  80  57188   0.33
07:47:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:48:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:36:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:37:00 PM 172.20.16.107   104.70.250.141  80  57188   0.48
07:37:00 PM 172.20.16.105   104.70.250.141  80  57188   0.66
07:38:00 PM 172.20.16.105   104.70.250.141  80  57188   0.47
07:39:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48
07:50:00 PM 172.20.16.106   104.70.250.141  80  57188   0.49
07:51:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:52:00 PM 172.20.16.106   104.70.250.141  80  57188   0.33
07:53:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:54:00 PM 172.20.16.106   104.70.250.141  80  57188   0.48
07:40:00 PM 172.20.16.105   104.70.250.141  80  57188   0.48

Код ->

val data = sc.textFile("src/Data.csv")
  .map(line => {
    val GroupRecord = line.split("\t")

    ((GroupRecord(1)), (GroupRecord(0), GroupRecord(2), GroupRecord(3), GroupRecord(4), GroupRecord(5)))
  })

val numPurchases = data.count()

val d1 = data.groupByKey().map(f => (f._1, f._2.toSeq.sortBy(f => f._1))).flatMapValues(f => f).map(f => (f._2._1, f._1, f._2._2, f._2._3, f._2._4, f._2._5))

d1 foreach (println(_))

println("No: " + numPurchases)
Другие вопросы по тегам