Обработка (OSM) PBF файлов в Spark
Данные OSM доступны в формате PBF. Существуют специализированные библиотеки (например, https://github.com/plasmap/geow для анализа этих данных).
Я хочу сохранить эти данные на S3 и проанализировать данные в СДР как часть задания EMR.
Какой простой способ добиться этого? Могу ли я получить файл на главный узел и обработать его локально? Если так, я бы создал пустое СДР и добавил бы к нему, поскольку потоковые события анализируются из входного файла?
4 ответа
Одним из решений было бы пропустить PBF. Одно искробезопасное представление - Паркет. В этом посте показано, как преобразовать PBF в паркет и как загрузить данные в Spark.
Я выпустил новую версию Osm4Scala, которая включает поддержку Spark 2 и 3.
На README.md много примеров.
Использовать действительно просто:
scala> val osmDF = spark.sqlContext.read.format("osm.pbf").load("<osm files path here>")
osmDF: org.apache.spark.sql.DataFrame = [id: bigint, type: tinyint ... 5 more fields]
scala> osmDF.createOrReplaceTempView("osm")
scala> spark.sql("select type, count(*) as num_primitives from osm group by type").show()
+----+--------------+
|type|num_primitives|
+----+--------------+
| 1| 338795|
| 2| 10357|
| 0| 2328075|
+----+--------------+
scala> spark.sql("select distinct(explode(map_keys(tags))) as tag_key from osm order by tag_key asc").show()
+------------------+
| tag_key|
+------------------+
| Calle|
| Conference|
| Exper|
| FIXME|
| ISO3166-1|
| ISO3166-1:alpha2|
| ISO3166-1:alpha3|
| ISO3166-1:numeric|
| ISO3166-2|
| MAC_dec|
| Nombre|
| Numero|
| Open|
| Peluqueria|
| Residencia UEM|
| Telefono|
| abandoned|
| abandoned:amenity|
| abandoned:barrier|
|abandoned:building|
+------------------+
only showing top 20 rows
scala> spark.sql("select id, latitude, longitude, tags from osm where type = 0").show()
+--------+------------------+-------------------+--------------------+
| id| latitude| longitude| tags|
+--------+------------------+-------------------+--------------------+
| 171933| 40.42006|-3.7016600000000004| []|
| 171946| 40.42125|-3.6844500000000004|[highway -> traff...|
| 171948|40.420230000000004|-3.6877900000000006| []|
| 171951|40.417350000000006|-3.6889800000000004| []|
| 171952| 40.41499|-3.6889800000000004| []|
| 171953| 40.41277|-3.6889000000000003| []|
| 171954| 40.40946|-3.6887900000000005| []|
| 171959| 40.40326|-3.7012200000000006| []|
|20952874| 40.42099|-3.6019200000000007| []|
|20952875|40.422610000000006|-3.5994900000000007| []|
|20952878| 40.42136000000001| -3.601470000000001| []|
|20952879| 40.42262000000001| -3.599770000000001| []|
|20952881| 40.42905000000001|-3.5970500000000007| []|
|20952883| 40.43131000000001|-3.5961000000000007| []|
|20952888| 40.42930000000001| -3.596590000000001| []|
|20952890| 40.43012000000001|-3.5961500000000006| []|
|20952891| 40.43043000000001|-3.5963600000000007| []|
|20952892| 40.43057000000001|-3.5969100000000007| []|
|20952893| 40.43039000000001|-3.5973200000000007| []|
|20952895| 40.42967000000001|-3.5972300000000006| []|
+--------+------------------+-------------------+--------------------+
only showing top 20 rows
Вам обязательно стоит взглянуть на проект Atlas (написанный на Java): https://github.com/osmlab/atlas и https://github.com/osmlab/atlas-generator. Он создается разработчиками Apple и позволяет распределенную обработку файлов osm.pbf с помощью Spark.
Я написал искровой источник данных для файлов .pbf. Он использует библиотеки Osmosis и использует обрезку сущностей: https://github.com/igorgatis/spark-osmpbf.
Вы, вероятно, захотите прочитать .pbf и записать в файл паркета, чтобы сделать будущие запросы намного быстрее. Пример использования:
import io.github.igorgatis.spark.osmpbf.OsmPbfOptions
val df = spark.read
.format(OsmPbfOptions.FORMAT)
.options(new OsmPbfOptions()
.withExcludeMetadata(true)
.withTagsAsMap(true)
.toMap)
.load("path/to/some.osm.pbf")
df.printSchema
Отпечатки:
root
|-- entity_type: string (nullable = false)
|-- id: long (nullable = false)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- nodes: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- index: integer (nullable = false)
| | |-- nodeId: long (nullable = false)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- member_id: long (nullable = false)
| | |-- role: string (nullable = true)
| | |-- type: string (nullable = true)