Pyspark эффективно выбирает несколько упорядоченных потоков данных в один RDD
Я переделываю конвейер прогнозирования в реальном времени для потоковой передачи данных датчиков Интернета вещей. Конвейер принимает образцы данных датчиков, структурированные как(sensor_id, timestamp, sample_index, value)
поскольку они создаются в исходной системе, сохраняет их локально и запускает пакетные задания pyspark для обучения алгоритмов и создания прогнозов.
В настоящее время данные датчиков сохраняются в локальных файлах на диске по одному файлу для каждого датчика и в HDFS для потоковой передачи искр. Задание потоковой передачи собирает каждую микропакет, вычисляет, сколько образцов поступило для каждого датчика, и решает, какие датчики накопили достаточно новых данных, чтобы сделать новый прогноз. Затем он сопоставляет каждую строку датчика в RDD с методом, который открывает файл данных с помощью python.open
, выполняет сканирование до последней обработанной выборки, берет данные из этой выборки и далее, а также некоторые данные истории, необходимые для прогнозирования, и запускает задание прогнозирования на искровом кластере. Кроме того, каждое фиксированное количество выборок для каждого алгоритма требует переоборудования, которое запрашивает длинную историю из того же хранилища данных и запускается на искровом кластере.
Наконец, RDD, обрабатываемый заданием прогнозирования, выглядит так:
|-----------------------------|
| sensor_id | sensor_data |
|-----------------------------|
| SENSOR_0 | [13,52,43,54,5] |
| SENSOR_1 | [22,42,23,3,35] |
| SENSOR_2 | [43,2,53,64,42] |
|-----------------------------|
Теперь мы сталкиваемся с проблемой масштаба при мониторинге нескольких сотен тысяч датчиков. Кажется, что самая дорогостоящая операция во время процесса - это чтение данных из файлов - задержка в несколько десятков миллисекунд при чтении каждого файла накапливается до неуправляемой задержки для всего задания прогнозирования. Кроме того, хранение данных в виде плоских файлов на диске вообще не масштабируется.
Мы планируем изменить метод хранения, чтобы повысить производительность и обеспечить масштабируемость. Использование баз данных временных рядов (мы пробовали timescaledb и Infxdb) создает проблему запроса данных для всех датчиков в одном запросе, когда каждый датчик нужно запрашивать с разного момента времени, а затем группировать отдельные выборки вsensor_data
column, как показано выше, что очень дорого, вызывает много перемешиваний и даже уступает по производительности решению с плоскими файлами. Мы также пробуем паркетные файлы, но их поведение при однократной записи затрудняет планирование структуры данных, которая будет хорошо работать в этом случае.
tl;dr - я ищу производительную архитектуру для следующего сценария:
- потоковая передача данных с датчиков поступает в режиме реального времени
- когда датчик накапливает достаточно образцов, текущие + исторические данные запрашиваются и отправляются в задание прогнозирования
- каждое задание прогнозирования обрабатывает все датчики, которые достигли порога в последней микропакете
- RDD содержит строки идентификатора датчика и упорядоченный массив всех запрошенных образцов.