Проблемы производительности массовой вставки PostgreSQL/JooQ при загрузке из CSV; как мне улучшить процесс?

Для этого проекта я собираюсь создать веб-версию и сейчас работаю над созданием бэкэнда PostgreSQL (9.x), из которого веб-приложение будет запрашивать.

Прямо сейчас происходит следующее: трассировщик генерирует zip-файл с двумя CSV-файлами в нем, загружает его в базу данных H2 во время выполнения, схема которого такова (и да, я знаю, что SQL можно было бы написать немного лучше):

create table matchers (
    id integer not null,
    class_name varchar(255) not null,
    matcher_type varchar(30) not null,
    name varchar(1024) not null
);

alter table matchers add primary key(id);

create table nodes (
    id integer not null,
    parent_id integer not null,
    level integer not null,
    success integer not null,
    matcher_id integer not null,
    start_index integer not null,
    end_index integer not null,
    time bigint not null
);

alter table nodes add primary key(id);
alter table nodes add foreign key (matcher_id) references matchers(id);
create index nodes_parent_id on nodes(parent_id);
create index nodes_indices on nodes(start_index, end_index);

Теперь, поскольку база данных PostgreSQL сможет обрабатывать более одной трассировки, мне пришлось добавить еще одну таблицу; схема на бэкэнде PostgreSQL выглядит следующим образом (также ниже среднего уровня оповещения SQL; также в parse_info стол, content столбец содержит полный текст проанализированного файла, в zip-файле он хранится отдельно):

create table parse_info (
    id uuid primary key,
    date timestamp not null,
    content text not null
);

create table matchers (
    parse_info_id uuid references parse_info(id),
    id integer not null,
    class_name varchar(255) not null,
    matcher_type varchar(30) not null,
    name varchar(1024) not null,
    unique (parse_info_id, id)
);

create table nodes (
    parse_info_id uuid references parse_info(id),
    id integer not null,
    parent_id integer not null,
    level integer not null,
    success integer not null,
    matcher_id integer not null,
    start_index integer not null,
    end_index integer not null,
    time bigint not null,
    unique (parse_info_id, id)
);

alter table nodes add foreign key (parse_info_id, matcher_id)
    references matchers(parse_info_id, id);
create index nodes_parent_id on nodes(parent_id);
create index nodes_indices on nodes(start_index, end_index);

Теперь то, что я сейчас делаю, это берет существующие zip-файлы и вставляет их в базу данных postgresql; Я использую JooQ и его API загрузки CSV.

Процесс немного сложен... Вот текущие шаги:

  • UUID генерируется;
  • Я читаю необходимую информацию из почтового индекса (дата разбора, ввод текста) и записываю запись в parse_info Таблица;
  • Я создаю временные копии CSV для того, чтобы API загрузки JooQ мог использовать его (см. После извлечения кода о том, почему);
  • Я вставляю все сопоставители, затем все узлы.

Вот код:

public final class Zip2Db2
{
    private static final Pattern SEMICOLON = Pattern.compile(";");
    private static final Function<String, String> CSV_ESCAPE
        = TraceCsvEscaper.ESCAPER::apply;

    // Paths in the zip to the different components
    private static final String INFO_PATH = "/info.csv";
    private static final String INPUT_PATH = "/input.txt";
    private static final String MATCHERS_PATH = "/matchers.csv";
    private static final String NODES_PATH = "/nodes.csv";

    // Fields to use for matchers zip insertion
    private static final List<Field<?>> MATCHERS_FIELDS = Arrays.asList(
        MATCHERS.PARSE_INFO_ID, MATCHERS.ID, MATCHERS.CLASS_NAME,
        MATCHERS.MATCHER_TYPE, MATCHERS.NAME
    );

    // Fields to use for nodes zip insertion
    private static final List<Field<?>> NODES_FIELDS = Arrays.asList(
        NODES.PARSE_INFO_ID, NODES.PARENT_ID, NODES.ID, NODES.LEVEL,
        NODES.SUCCESS, NODES.MATCHER_ID, NODES.START_INDEX, NODES.END_INDEX,
        NODES.TIME
    );

    private final FileSystem fs;
    private final DSLContext jooq;
    private final UUID uuid;

    private final Path tmpdir;

    public Zip2Db2(final FileSystem fs, final DSLContext jooq, final UUID uuid)
        throws IOException
    {
        this.fs = fs;
        this.jooq = jooq;
        this.uuid = uuid;

        tmpdir = Files.createTempDirectory("zip2db");
    }

    public void removeTmpdir()
        throws IOException
    {
        // From java7-fs-more (https://github.com/fge/java7-fs-more)
        MoreFiles.deleteRecursive(tmpdir, RecursionMode.KEEP_GOING);
    }

    public void run()
    {
        time(this::generateMatchersCsv, "Generate matchers CSV");
        time(this::generateNodesCsv, "Generate nodes CSV");
        time(this::writeInfo, "Write info record");
        time(this::writeMatchers, "Write matchers");
        time(this::writeNodes, "Write nodes");
    }

    private void generateMatchersCsv()
        throws IOException
    {
        final Path src = fs.getPath(MATCHERS_PATH);
        final Path dst = tmpdir.resolve("matchers.csv");

        try (
            final Stream<String> lines = Files.lines(src);
            final BufferedWriter writer = Files.newBufferedWriter(dst,
                StandardOpenOption.CREATE_NEW);
        ) {
            // Throwing below is from throwing-lambdas
            // (https://github.com/fge/throwing-lambdas)
            lines.map(this::toMatchersLine)
                .forEach(Throwing.consumer(writer::write));
        }
    }

    private String toMatchersLine(final String input)
    {
        final List<String> parts = new ArrayList<>();
        parts.add('"' + uuid.toString() + '"');
        Arrays.stream(SEMICOLON.split(input, 4))
            .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
            .forEach(parts::add);
        return String.join(";", parts) + '\n';
    }

    private void generateNodesCsv()
        throws IOException
    {
        final Path src = fs.getPath(NODES_PATH);
        final Path dst = tmpdir.resolve("nodes.csv");

        try (
            final Stream<String> lines = Files.lines(src);
            final BufferedWriter writer = Files.newBufferedWriter(dst,
                StandardOpenOption.CREATE_NEW);
        ) {
            lines.map(this::toNodesLine)
                .forEach(Throwing.consumer(writer::write));
        }
    }

    private String toNodesLine(final String input)
    {
        final List<String> parts = new ArrayList<>();
        parts.add('"' + uuid.toString() + '"');
        SEMICOLON.splitAsStream(input)
            .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
            .forEach(parts::add);
        return String.join(";", parts) + '\n';
    }

    private void writeInfo()
        throws IOException
    {
        final Path path = fs.getPath(INFO_PATH);

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            final String[] elements = SEMICOLON.split(reader.readLine());

            final long epoch = Long.parseLong(elements[0]);
            final Instant instant = Instant.ofEpochMilli(epoch);
            final ZoneId zone = ZoneId.systemDefault();
            final LocalDateTime time = LocalDateTime.ofInstant(instant, zone);

            final ParseInfoRecord record = jooq.newRecord(PARSE_INFO);

            record.setId(uuid);
            record.setContent(loadText());
            record.setDate(Timestamp.valueOf(time));

            record.insert();
        }
    }

    private String loadText()
        throws IOException
    {
        final Path path = fs.getPath(INPUT_PATH);

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            return CharStreams.toString(reader);
        }
    }

    private void writeMatchers()
        throws IOException
    {
        final Path path = tmpdir.resolve("matchers.csv");

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            jooq.loadInto(MATCHERS)
                .onErrorAbort()
                .loadCSV(reader)
                .fields(MATCHERS_FIELDS)
                .separator(';')
                .execute();
        }
    }

    private void writeNodes()
        throws IOException
    {
        final Path path = tmpdir.resolve("nodes.csv");

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            jooq.loadInto(NODES)
                .onErrorAbort()
                .loadCSV(reader)
                .fields(NODES_FIELDS)
                .separator(';')
                .execute();
        }
    }

    private void time(final ThrowingRunnable runnable, final String description)
    {
        System.out.println(description + ": start");
        final Stopwatch stopwatch = Stopwatch.createStarted();
        runnable.run();
        System.out.println(description + ": done (" + stopwatch.stop() + ')');
    }

    public static void main(final String... args)
        throws IOException
    {
        if (args.length != 1) {
            System.err.println("missing zip argument");
            System.exit(2);
        }

        final Path zip = Paths.get(args[0]).toRealPath();

        final UUID uuid = UUID.randomUUID();
        final DSLContext jooq = PostgresqlTraceDbFactory.defaultFactory()
            .getJooq();

        try (
            final FileSystem fs = MoreFileSystems.openZip(zip, true);
        ) {
            final Zip2Db2 zip2Db = new Zip2Db2(fs, jooq, uuid);
            try {
                zip2Db.run();
            } finally {
                zip2Db.removeTmpdir();
            }
        }
    }
}

Теперь моя первая проблема... Это намного медленнее, чем загрузка в H2. Вот время для CSV, содержащего 620 сопоставителей и 45746 узлов:

Generate matchers CSV: start
Generate matchers CSV: done (45.26 ms)
Generate nodes CSV: start
Generate nodes CSV: done (573.2 ms)
Write info record: start
Write info record: done (311.1 ms)
Write matchers: start
Write matchers: done (4.192 s)
Write nodes: start
Write nodes: done (22.64 s)

Дайте или возьмите и забудете часть о написании специализированных CSV (см. Ниже), то есть 25 секунд. Загрузка в оперативную базу данных H2 на диске занимает менее 5 секунд!

Другая проблема, с которой я столкнулся, заключается в том, что мне приходится писать специальные CSV; похоже, что API загрузки CSV не очень гибок в том, что он принимает, и мне, например, нужно перевернуть эту строку:

328;SequenceMatcher;COMPOSITE;token

в это:

"some-randome-uuid-here";"328";"SequenceMatcher";"COMPOSITE";"token"

Но моя самая большая проблема в том, что этот почтовый индекс довольно маленький. Например, у меня есть почтовый индекс не с 620, а с 1532 сопоставителями, и не с 45746 узлами, а с более чем 34 миллионами узлов; даже если мы отклоним время генерации CSV (CSV для исходных узлов составляет 1,2 ГиБ), поскольку для впрыска H2 требуется 20 минут, умножение на 5 дает время, составляющее точку к югу от 1h30mn, что очень много!

В целом, процесс в настоящее время довольно неэффективен...


Теперь в защиту PostgreSQL:

  • ограничения для экземпляра PostgreSQL намного выше, чем для экземпляра H2: мне не нужен UUID в сгенерированных zip-файлах;
  • H2 настроен "небезопасно" для записи: jdbc:h2:/path/to/db;LOG=0;LOCK_MODE=0;UNDO_LOG=0;CACHE_SIZE=131072,

Тем не менее, эта разница во времени вставки кажется немного чрезмерной, и я совершенно уверен, что она может быть лучше. Но я не знаю с чего начать.

Кроме того, я знаю, что PostgreSQL имеет специальный механизм для загрузки из CSV, но здесь CSV находятся в zip-файле для начала, и я действительно хотел бы избежать создания отдельного CSV, как я делаю в настоящее время.. В идеале я хотел бы читать строку за строкой непосредственно из zip (что я и делаю для инъекции H2), преобразовывать строку и записывать в схему PostgreSQL.

Наконец, я также знаю, что в настоящее время я не отключаю ограничения на схему PostgreSQL перед вставкой; Я еще не попробовал это (это будет иметь значение?).

Итак, что вы предлагаете мне сделать, чтобы улучшить производительность?

2 ответа

Самый быстрый способ сделать массовую вставку из файла CSV в PostgreSQL - это Copy. Команда COPY оптимизирована для вставки большого количества строк.

В Java вы можете использовать реализацию Copy для драйвера JDBC PostgreSQL

Вот хороший небольшой пример того, как его использовать: как скопировать данные из файла в PostgreSQL с помощью JDBC?

Если у вас есть CSV с заголовками, вы бы хотели выполнить команду, подобную этой:

\COPY mytable FROM '/tmp/mydata.csv' DELIMITER ';' CSV HEADER

Еще одним повышением производительности при добавлении больших объемов данных в существующую таблицу является удаление индексов, вставка данных, а затем повторное создание индексов.

Вот пара мер, которые вы можете предпринять

Обновление до JOOQ 3.6

В jOOQ 3.6 есть два новых режима в Loader API:

Использование этих методов значительно ускорило загрузку на порядки величин. Смотрите также эту статью о производительности пакетной загрузки JDBC.

Сохраняйте журналы UNDO / REDO небольшими

В настоящее время вы загружаете все в одну огромную транзакцию (или вы используете автоматическую фиксацию, но это тоже не хорошо). Это плохо для больших нагрузок, потому что база данных должна отслеживать все вставки в вашем сеансе вставки, чтобы иметь возможность откатывать их при необходимости.

Это становится еще хуже, когда вы делаете это в реальной системе, где такие большие нагрузки вызывают много конфликтов.

jOOQ-х Loader API позволяет указать размер "коммита" через LoaderOptionsStep.commitAfter(int)

Выключите ведение журнала и ограничения полностью

Это возможно только в том случае, если вы загружаете данные в автономном режиме, но это может значительно ускорить загрузку, если вы полностью отключите ведение журнала в своей базе данных (для этой таблицы), а если отключите ограничения во время загрузки, включите их снова после загрузки,

Наконец, я также знаю, что в настоящее время я не отключаю ограничения на схему PostgreSQL перед вставкой; Я еще не попробовал это (это будет иметь значение?).

О да, это будет. В частности, уникальное ограничение стоит дорого для каждой отдельной вставки, так как оно должно поддерживаться все время.

Работать на более простой char[] API манипуляции

Этот код здесь:

final List<String> parts = new ArrayList<>();
parts.add('"' + uuid.toString() + '"');
Arrays.stream(SEMICOLON.split(input, 4))
      .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
      .forEach(parts::add);
return String.join(";", parts) + '\n';

Создает большое давление на сборщик мусора, поскольку вы неявно создаете и отбрасываете много StringBuilder объекты ( некоторую информацию об этом можно найти в этом блоге). Обычно это нормально и не должно быть преждевременно оптимизировано, но в больших пакетных процессах вы, безусловно, можете набрать пару процентов скорости, если преобразуете вышеперечисленное в более низкий уровень:

StringBuilder result = new StringBuilder();
result.append('"').append(uuid.toString()).append('"');

for (String s : SEMICOLON.split(input, 4))
    result.append('"').append(CSV_ESCAPE.apply(s)).append('"');

...

Конечно, вы все еще можете написать то же самое в функциональном стиле, но я обнаружил, что проще оптимизировать эти низкоуровневые строковые операции, используя классические идиомы до Java 8.

Другие вопросы по тегам