Обработка (OSM) PBF файлов в Spark

Данные OSM доступны в формате PBF. Существуют специализированные библиотеки (например, https://github.com/plasmap/geow для анализа этих данных).

Я хочу сохранить эти данные на S3 и проанализировать данные в СДР как часть задания EMR.

Какой простой способ добиться этого? Могу ли я получить файл на главный узел и обработать его локально? Если так, я бы создал пустое СДР и добавил бы к нему, поскольку потоковые события анализируются из входного файла?

4 ответа

Одним из решений было бы пропустить PBF. Одно искробезопасное представление - Паркет. В этом посте показано, как преобразовать PBF в паркет и как загрузить данные в Spark.

Я выпустил новую версию Osm4Scala, которая включает поддержку Spark 2 и 3.

На README.md много примеров.

Использовать действительно просто:

scala> val osmDF = spark.sqlContext.read.format("osm.pbf").load("<osm files path here>")
osmDF: org.apache.spark.sql.DataFrame = [id: bigint, type: tinyint ... 5 more fields]

scala> osmDF.createOrReplaceTempView("osm")

scala> spark.sql("select type, count(*) as num_primitives from osm group by type").show()
+----+--------------+                                                           
|type|num_primitives|
+----+--------------+
|   1|        338795|
|   2|         10357|
|   0|       2328075|
+----+--------------+

scala> spark.sql("select distinct(explode(map_keys(tags))) as tag_key from osm order by tag_key asc").show()
+------------------+                                                            
|           tag_key|
+------------------+
|             Calle|
|        Conference|
|             Exper|
|             FIXME|
|         ISO3166-1|
|  ISO3166-1:alpha2|
|  ISO3166-1:alpha3|
| ISO3166-1:numeric|
|         ISO3166-2|
|           MAC_dec|
|            Nombre|
|            Numero|
|              Open|
|        Peluqueria|
|    Residencia UEM|
|          Telefono|
|         abandoned|
| abandoned:amenity|
| abandoned:barrier|
|abandoned:building|
+------------------+
only showing top 20 rows

scala> spark.sql("select id, latitude, longitude, tags from osm where type = 0").show()
+--------+------------------+-------------------+--------------------+
|      id|          latitude|          longitude|                tags|
+--------+------------------+-------------------+--------------------+
|  171933|          40.42006|-3.7016600000000004|                  []|
|  171946|          40.42125|-3.6844500000000004|[highway -> traff...|
|  171948|40.420230000000004|-3.6877900000000006|                  []|
|  171951|40.417350000000006|-3.6889800000000004|                  []|
|  171952|          40.41499|-3.6889800000000004|                  []|
|  171953|          40.41277|-3.6889000000000003|                  []|
|  171954|          40.40946|-3.6887900000000005|                  []|
|  171959|          40.40326|-3.7012200000000006|                  []|
|20952874|          40.42099|-3.6019200000000007|                  []|
|20952875|40.422610000000006|-3.5994900000000007|                  []|
|20952878| 40.42136000000001| -3.601470000000001|                  []|
|20952879| 40.42262000000001| -3.599770000000001|                  []|
|20952881| 40.42905000000001|-3.5970500000000007|                  []|
|20952883| 40.43131000000001|-3.5961000000000007|                  []|
|20952888| 40.42930000000001| -3.596590000000001|                  []|
|20952890| 40.43012000000001|-3.5961500000000006|                  []|
|20952891| 40.43043000000001|-3.5963600000000007|                  []|
|20952892| 40.43057000000001|-3.5969100000000007|                  []|
|20952893| 40.43039000000001|-3.5973200000000007|                  []|
|20952895| 40.42967000000001|-3.5972300000000006|                  []|
+--------+------------------+-------------------+--------------------+
only showing top 20 rows

Вам обязательно стоит взглянуть на проект Atlas (написанный на Java): https://github.com/osmlab/atlas и https://github.com/osmlab/atlas-generator. Он создается разработчиками Apple и позволяет распределенную обработку файлов osm.pbf с помощью Spark.

Я написал искровой источник данных для файлов .pbf. Он использует библиотеки Osmosis и использует обрезку сущностей: https://github.com/igorgatis/spark-osmpbf.

Вы, вероятно, захотите прочитать .pbf и записать в файл паркета, чтобы сделать будущие запросы намного быстрее. Пример использования:

      import io.github.igorgatis.spark.osmpbf.OsmPbfOptions

val df = spark.read
    .format(OsmPbfOptions.FORMAT)
    .options(new OsmPbfOptions()
             .withExcludeMetadata(true)
             .withTagsAsMap(true)
             .toMap)
    .load("path/to/some.osm.pbf")

df.printSchema

Отпечатки:

      root
 |-- entity_type: string (nullable = false)
 |-- id: long (nullable = false)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- nodes: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- index: integer (nullable = false)
 |    |    |-- nodeId: long (nullable = false)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- member_id: long (nullable = false)
 |    |    |-- role: string (nullable = true)
 |    |    |-- type: string (nullable = true)
Другие вопросы по тегам