Pyspark эффективно выбирает несколько упорядоченных потоков данных в один RDD

Я переделываю конвейер прогнозирования в реальном времени для потоковой передачи данных датчиков Интернета вещей. Конвейер принимает образцы данных датчиков, структурированные как(sensor_id, timestamp, sample_index, value) поскольку они создаются в исходной системе, сохраняет их локально и запускает пакетные задания pyspark для обучения алгоритмов и создания прогнозов.

В настоящее время данные датчиков сохраняются в локальных файлах на диске по одному файлу для каждого датчика и в HDFS для потоковой передачи искр. Задание потоковой передачи собирает каждую микропакет, вычисляет, сколько образцов поступило для каждого датчика, и решает, какие датчики накопили достаточно новых данных, чтобы сделать новый прогноз. Затем он сопоставляет каждую строку датчика в RDD с методом, который открывает файл данных с помощью python.open, выполняет сканирование до последней обработанной выборки, берет данные из этой выборки и далее, а также некоторые данные истории, необходимые для прогнозирования, и запускает задание прогнозирования на искровом кластере. Кроме того, каждое фиксированное количество выборок для каждого алгоритма требует переоборудования, которое запрашивает длинную историю из того же хранилища данных и запускается на искровом кластере.

Наконец, RDD, обрабатываемый заданием прогнозирования, выглядит так:

|-----------------------------|
| sensor_id | sensor_data     |
|-----------------------------|
| SENSOR_0  | [13,52,43,54,5] |
| SENSOR_1  | [22,42,23,3,35] |
| SENSOR_2  | [43,2,53,64,42] |
|-----------------------------|

Теперь мы сталкиваемся с проблемой масштаба при мониторинге нескольких сотен тысяч датчиков. Кажется, что самая дорогостоящая операция во время процесса - это чтение данных из файлов - задержка в несколько десятков миллисекунд при чтении каждого файла накапливается до неуправляемой задержки для всего задания прогнозирования. Кроме того, хранение данных в виде плоских файлов на диске вообще не масштабируется.

Мы планируем изменить метод хранения, чтобы повысить производительность и обеспечить масштабируемость. Использование баз данных временных рядов (мы пробовали timescaledb и Infxdb) создает проблему запроса данных для всех датчиков в одном запросе, когда каждый датчик нужно запрашивать с разного момента времени, а затем группировать отдельные выборки вsensor_datacolumn, как показано выше, что очень дорого, вызывает много перемешиваний и даже уступает по производительности решению с плоскими файлами. Мы также пробуем паркетные файлы, но их поведение при однократной записи затрудняет планирование структуры данных, которая будет хорошо работать в этом случае.

tl;dr - я ищу производительную архитектуру для следующего сценария:

  1. потоковая передача данных с датчиков поступает в режиме реального времени
  2. когда датчик накапливает достаточно образцов, текущие + исторические данные запрашиваются и отправляются в задание прогнозирования
  3. каждое задание прогнозирования обрабатывает все датчики, которые достигли порога в последней микропакете
  4. RDD содержит строки идентификатора датчика и упорядоченный массив всех запрошенных образцов.

0 ответов

Другие вопросы по тегам