Стандартный алгоритм или шаблон для чтения больших данных параллельно с использованием JDBC в приложении Java
Ниже приведена простая программа, которая читает данные из MySQL и сохраняет их в файле CSV. Если запрос возвращает более 10 миллионов записей, он будет медленным.
Я полностью понимаю, что, чтобы сделать параллельно, нам нужно сделать процесс, как
- Получить количество записей из запроса (выберите * из пользователей)
- Затем разбейте запрос на параллельный кусок с соответствующим (выберите * из пользователей, где state = 'CA')
- Затем данные могут быть прочитаны параллельно в 50 потоков или распределены по процессу.
Apache spark использует partition_column с нижним верхним пределом и номером раздела, как показано ниже.
Мне любопытно узнать, есть ли способ / шаблон / алгоритм, который можно использовать в приложении Non-Spark для параллельного получения огромных данных. Однако я посмотрю на код Spark для реализации ниже.
https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3
spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)
.load()
SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000
Простой код MySQL для чтения и хранения данных в файле CSV
public static void main(String[] args)
{
try
{
String myDriver = "org.gjt.mm.mysql.Driver";
String myUrl = "jdbc:mysql://localhost/test";
Class.forName(myDriver);
Connection conn = DriverManager.getConnection(myUrl, "root", "");
String query = "SELECT * FROM users";
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(query);
StringBuilder sb = new StringBuilder();
while (rs.next())
{
int id = rs.getInt("id");
String firstName = rs.getString("first_name");
String lastName = rs.getString("last_name");
Date dateCreated = rs.getDate("date_created");
boolean isAdmin = rs.getBoolean("is_admin");
int numPoints = rs.getInt("num_points");
sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
}
try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
oS.write(sb.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
st.close();
}
catch (Exception e)
{
System.err.println("Got an exception! ");
System.err.println(e.getMessage());
}
}
1 ответ
Это не точно отвечает на ваш вопрос, но SELECT DATA INTO OUTFILE
может помочь вам быстро экспортировать ваши данные.
Вот пример команды для создания файла CSV в вашем случае,
SELECT *
INTO OUTFILE '/some/path/to/users.csv'
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM users;
Это использует быстрый путь для записи данных в вашу файловую систему и может быть быстрее, чем ваш многопоточный подход. Это, конечно, легче программировать.
Это всегда хорошая идея, чтобы предшествовать такой запрос большого объема с SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
чтобы не блокировать вставки и обновления таблицы.
Если вы будете использовать несколько потоков Java для извлечения ваших данных, я предлагаю вам использовать эту стратегию:
Прежде чем создавать темы, определите наибольшее
id
значение, выполнив этот запрос:SELECT MAX(id) FROM users;
Решите, сколько потоков вы будете порождать. Слишком много потоков будет контрпродуктивным, потому что они будут перегружать ваш сервер MySQL. Пятьдесят потоков - это слишком много подключений к вашему серверу MySQL. Используйте четыре или восемь.
Дайте каждому потоку свой сегмент
id
значения для извлечения. Например, если у вас есть десять миллионов строк и четыре потока, сегментами будут [1-2500000], [2500001-5000000], [5000001-7500000] и [7500001-10000000].В каждом потоке откройте соединение jdbc с MySQL и выполните
WHERE id BETWEEN segmentstart AND segmentfinish
правильно выбрать ряды. (MySQL соединения не являются потокобезопасными объектами).Положил
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
перед вашими запросами SELECT.
id
(предположительно) первичный ключ users
стол, так что WHERE
фильтрация с использованием этого будет очень эффективной.