Кирпичи данных Spark CREATE TABLE - это навсегда для 1 миллиона маленьких файлов XML

У меня есть набор из 1 миллиона XML-файлов, каждый размером ~14 КБ в хранилище BLOB- объектов Azure, смонтированный в блоке данных Azure, и я пытаюсь использовать CREATE TABLE, с ожиданием одной записи для каждого файла.

Эксперимент

Структура содержимого файлов изображена ниже. Для простоты и производительности всего содержимого файлов, кроме <ID> элемент остается идентичным.

<OBSERVATION>
  <HEADER>...</HEADER>
  <RESULT>
    <ID>...</ID>
    <VALUES>...</VALUES>
  </RESULT>
</OBSERVATION>

Для анализа / десериализации я использую spark-xml от Databricks. В данный момент я ожидаю записи с двумя столбцами HEADER а также RESULT что я и получаю.

CREATE TABLE Observations
USING XML
OPTIONS (
  path "/mnt/blobstorage/records/*.xml",
  rowTag "RESULT",
  rootTag "OBSERVATION",
  excludeAttribute True
)

Эта проблема

CREATE TABLE Оператор выполняется в течение 5,5 часов (SQL-запрос, имеющий имя sql at SQLDriverLocal.scala:87 в интерфейсе Spark), из которых только 1 час тратится на задания Spark (на вкладке "Задания" в интерфейсе Spark).

Я заметил, что клетка с CREATE TABLE команда остается в Listing files at "/mnt/blobstorage/records/*.xml" большую часть времени. Сначала я подумал, что это проблема масштабирования в разъеме хранения. Тем не менее, я могу выполнить команду для ~500K файлов JSON аналогичного размера за ~25 секунд (проблема с XML против JSON?).

Я тоже знаю что spark-xml читает все файлы, чтобы вывести схему, что может быть узким местом. Чтобы исключить эту возможность, я попытался:

  • предопределить схему (только из первого XML-файла)
  • глотать как открытый текст без разбора (используя TEXT провайдер). Та же проблема сохраняется в обоих случаях.

Тот же оператор выполняется в течение 20 секунд для записей 10K и через 30 минут для записей 200K. При линейном масштабировании (что, очевидно, не происходит), 1 миллион записей были бы сделаны за ~33 минуты.

Мой кластер Databricks имеет 1 рабочий узел и 3 узла драйверов, каждый из которых имеет 256 ГБ ОЗУ и 64 ядра, поэтому не должно быть узкого места в кэшировании. Я успешно воспроизвел проблему в несколько прогонов в течение 4 дней.

Вопрос

Что я здесь не так делаю? Если есть разделение / кластеризация, я могу сделать это во время CREATE TABLE, как мне это сделать?

1 ответ

Решение

Я догадываюсь, что у вас небольшая проблема с файлами, поскольку вы обрабатываете только 15 ГБ. Я бы объединял маленькие файлы в большие файлы каждый раз. Размер 250 МБ. Поскольку ваш набор данных еще мал, вы можете сделать это на драйвере. Следующий код показывает, как выполняется слияние на узле драйвера (без учета оптимального размера файла):

1. Скопируйте файлы из Blob в локальную файловую систему и сгенерируйте скрипт для слияния файлов:

# copy files from mounted storage to driver local storage
dbutils.fs.cp("dbfs:/mnt/blobstorage/records/", "file:/databricks/driver/temp/records", recurse=True)  

unzipdir= 'temp/records/'
gzipdir= 'temp/gzip/'

# generate shell-script and write it into the local filesystem
script = "cat " + unzipdir + "*.xml > " + gzipdir + """all.xml gzip """ + gzipdir + "all.xml"
dbutils.fs.put("file:/databricks/driver/scripts/makeone.sh", script, True)

2. Запустите скрипт оболочки

%sh
sudo sh ./scripts/makeone.sh

3. Скопируйте файлы обратно в смонтированное хранилище.

dbutils.fs.mv("file:/databricks/driver/" + gzipdir, "dbfs:/mnt/mnt/blobstorage/recordsopt/", recurse=True) 

Другим важным моментом является то, что библиотека spark-xml использует два этапа:

  1. Он анализирует данные, чтобы вывести схему. Если параметр samplingRatio не изменяется, он делает это для всего набора данных. Зачастую этого достаточно сделать только для небольшой выборки, или вы можете предварительно определить схему (используйте для этого схему параметров), тогда вам не нужен этот шаг.
  2. Чтение данных.

Наконец, я бы порекомендовал хранить данные в паркете, поэтому выполняйте более сложные запросы в формате на основе столбцов, чем непосредственно в xmls, и используйте lib spark-xml lib для этого этапа предварительной обработки.

Другие вопросы по тегам