Потоковая передача данных из CloudSql в Dataflow
В настоящее время мы изучаем, как мы можем обработать большое количество хранилищ данных в базе данных Google Cloud SQL (MySQL), используя Apache Beam/Google Dataflow.
База данных хранит около 200 ГБ данных в одной таблице.
Мы успешно читаем строки из базы данных, используя JdbcIO
, но пока это возможно только в том случае, если мы LIMIT
количество запрашиваемых строк. В противном случае мы столкнемся с проблемой памяти. Я предполагаю по умолчанию SELECT
Запрос пытается загрузить все полученные строки в память.
Каков идиоматический подход к этому? Пакетные запросы SQL? Потоковые результаты?
Мы попытались настроить fetch size
из заявления выполнено, без особого успеха.
Вот как выглядит наша установка чтения JDBC:
JdbcReadOptions(
connectionOptions = connOpts,
query = "SELECT data FROM raw_data",
statementPreparator = statement => statement.setFetchSize(100),
rowMapper = result => result.getString(1)
)
Я не нашел никаких ресурсов относительно потока из SQL до сих пор.
РЕДАКТИРОВАТЬ
Я перечислю подходы, которые я выбрал, чтобы другие могли чему-то научиться (например, как этого не делать). Чтобы иметь немного больше контекста, рассматриваемая таблица базы данных действительно плохо структурирована: в ней есть столбец, содержащий строку JSON, и id
столбец (первичный ключ) плюс added
а также modified
столбец (оба TIMESTAMP
типов). Во время первого подхода у него не было никаких дальнейших показателей. Таблица содержит 25 миллионов строк. Так что это, вероятно, скорее проблема с базой данных, чем проблема Apache Beam/JDBC. НО ТЕМ НЕМЕНЕЕ:
Подход 1 (выше) - запрашивать все
В основном это выглядело так:
val readOptions = JdbcReadOptions(
connectionOptions = connOpts,
query = "SELECT data FROM raw_data",
rowMapper = result => result.getString(1)
)
context
.jdbcSelect(readOptions)
.map(/*...*/)
Это сработало, если я добавил LIMIT
на запрос. Но, очевидно, было очень медленно.
Подход 2 - Пагинация набора ключей
val queries = List(
"SELECT data from raw_data LIMIT 5000 OFFSET 0",
"SELECT data from raw_data LIMIT 5000 OFFSET 5000",
"SELECT data from raw_data LIMIT 5000 OFFSET 10000"
// ...
)
context
.parallelize(queries)
.map(query => {
val connection = DriverManager.getConnection(/* */)
val statement = connection.prepareStatement(query)
val result = statement.executeQuery()
makeIterable(result) // <-- creates a Iterator[String]
})
.flatten
.map(/* processing */)
Это сработало несколько лучше, хотя я быстро понял, что LIMIT _ OFFSET _
комбинация также начинает сканирование с первого ряда. Таким образом, каждый последующий запрос занимал больше времени, приближаясь к долгому времени.
Подход 2.5 - Раскладка клавиатуры с упорядочением
Как и вышеупомянутый подход, но мы создали индекс на added
столбец и обновил запрос
SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x
Это ускорило процесс, но со временем время запроса увеличилось.
Подход 3 - Нет луча / потока данных
val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)
val rs = statement.executeQuery("SELECT data FROM raw_data")
while(rs.next()) {
writer writeLine rs.getString(1)
}
Это поток результатов обратно построчно и записывает строки в файлы. Он пробежал около 2 часов для всех 25 миллионов записей. В заключение. Было бы замечательно, если бы кто-то мог указать, как это решение может быть достигнуто с помощью Beam.
Кстати, теперь, когда у меня есть необработанные данные, обработка CSV-файлов с помощью Beam очень проста. Это около 80 ГБ необработанных данных, которые можно преобразовать в другой формат CSV за 5 минут с автоматическим масштабированием и т. Д.
2 ответа
Я думаю, что JDBCIO не очень хорошо масштабируется из-за присущих ему ограничений (один SELECT). Я не знаю о поддержке потоковой передачи из MySQL и BEAM.
Вероятно, вы можете записать свою БД в более простую для обработки систему обработки данных (например, в csv). Работает ли это для вас?
Кажется, что драйвер MySQL JDBC требует некоторых специальных мер, чтобы он не загружал весь набор результатов в память; например, я смог найти этот код, решающий проблему в другом проекте. JdbcIO
нужно будет сделать то же самое, или, по крайней мере, быть достаточно настраиваемым, чтобы позволить пользователю сделать это. Я подал вопрос https://issues.apache.org/jira/browse/BEAM-3714.
Между тем, в качестве обходного пути, вы можете использовать JdbcIO.readAll()
разделить ваш запрос на множество небольших запросов, например, вы можете разделить его по диапазону идентификаторов. Обратите внимание, что между ними не будет обеспечена согласованность транзакций - они будут независимыми запросами в отношении MySQL.