Массовая вставка занимает больше времени, чем ожидалось, используя Dapper

Прочитав эту статью, я решил поближе познакомиться с тем, как я использовал Dapper.

Я запустил этот код в пустой базе данных

var members = new List<Member>();
for (int i = 0; i < 50000; i++)
{
    members.Add(new Member()
    {
        Username = i.toString(),
        IsActive = true
    });
}

using (var scope = new TransactionScope())
{
    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

    scope.Complete();
}

это заняло около 20 секунд. Это 2500 вставок в секунду. Неплохо, но не отлично, учитывая, что блог набирает 45 тыс. Вставок в секунду. Есть ли более эффективный способ сделать это в Dapper?

Также, как примечание, выполнение этого кода через отладчик Visual Studio заняло более 3 минут! Я полагал, что отладчик немного замедлит его, но я был очень удивлен, увидев это.

ОБНОВИТЬ

Так это

using (var scope = new TransactionScope())
{
    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

    scope.Complete();
}

и это

    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

оба заняли 20 секунд.

Но это заняло 4 секунды!

SqlTransaction trans = connection.BeginTransaction();

connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members, transaction: trans);

trans.Commit();

5 ответов

Решение

Лучшее, чего я смог достичь, это 50k записей за 4 секунды, используя этот подход

SqlTransaction trans = connection.BeginTransaction();

connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members, transaction: trans);

trans.Commit();

Недавно я наткнулся на это и заметил, что TransactionScope создается после открытия соединения (я предполагаю, что, поскольку Dappers Execute не открывает соединение, в отличие от Query). В соответствии с ответом Q4 здесь: /questions/21363674/pri-kakih-obstoyatelstvah-sqlconnection-avtomaticheski-zachislyaetsya-v-vneshnyuyu-tranzaktsiyu-transactionscope/21363693#21363693 что не приведет к тому, что соединение будет обрабатываться TransactionScope. Мой коллега провел несколько быстрых тестов, и открытие соединения за пределами TransactionScope резко снизило производительность.

Таким образом, изменение на следующее должно работать:

// Assuming the connection isn't already open
using (var scope = new TransactionScope())
{
    connection.Open();
    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

    scope.Complete();
}

Я создал метод расширения, который позволит вам очень быстро выполнять массовую вставку.

public static class DapperExtensions
{
    public static async Task BulkInsert<T>(
        this IDbConnection connection,
        string tableName,
        IReadOnlyCollection<T> items,
        Dictionary<string, Func<T, object>> dataFunc)
    {
        const int MaxBatchSize = 1000;
        const int MaxParameterSize = 2000;

        var batchSize = Math.Min((int)Math.Ceiling((double)MaxParameterSize / dataFunc.Keys.Count), MaxBatchSize);
        var numberOfBatches = (int)Math.Ceiling((double)items.Count / batchSize);
        var columnNames = dataFunc.Keys;
        var insertSql = $"INSERT INTO {tableName} ({string.Join(", ", columnNames.Select(e => $"[{e}]"))}) VALUES ";
        var sqlToExecute = new List<Tuple<string, DynamicParameters>>();

        for (var i = 0; i < numberOfBatches; i++)
        {
            var dataToInsert = items.Skip(i * batchSize)
                .Take(batchSize);
            var valueSql = GetQueries(dataToInsert, dataFunc);

            sqlToExecute.Add(Tuple.Create($"{insertSql}{string.Join(", ", valueSql.Item1)}", valueSql.Item2));
        }

        foreach (var sql in sqlToExecute)
        {
            await connection.ExecuteAsync(sql.Item1, sql.Item2, commandTimeout: int.MaxValue);
        }
    }

    private static Tuple<IEnumerable<string>, DynamicParameters> GetQueries<T>(
        IEnumerable<T> dataToInsert,
        Dictionary<string, Func<T, object>> dataFunc)
    {
        var parameters = new DynamicParameters();

        return Tuple.Create(
            dataToInsert.Select(e => $"({string.Join(", ", GenerateQueryAndParameters(e, parameters, dataFunc))})"),
            parameters);
    }

    private static IEnumerable<string> GenerateQueryAndParameters<T>(
        T entity,
        DynamicParameters parameters,
        Dictionary<string, Func<T, object>> dataFunc)
    {
        var paramTemplateFunc = new Func<Guid, string>(guid => $"@p{guid.ToString().Replace("-", "")}");
        var paramList = new List<string>();

        foreach (var key in dataFunc)
        {
            var paramName = paramTemplateFunc(Guid.NewGuid());
            parameters.Add(paramName, key.Value(entity));
            paramList.Add(paramName);
        }

        return paramList;
    }
}

Затем, чтобы использовать этот метод расширения, вы должны написать следующий код:

await dbConnection.BulkInsert(
    "MySchemaName.MyTableName",
    myCollectionOfItems,
    new Dictionary<string, Func<MyObjectToInsert, object>>
        {
            { "ColumnOne", u => u.ColumnOne },
            { "ColumnTwo", u => u.ColumnTwo },
            ...
        });

Это довольно примитивно, и есть возможности для дальнейшего улучшения, например, передача транзакции или значения commandTimeout, но это помогает мне.

С использованием Execute Метод только с одним оператором вставки никогда не выполнит массовую вставку или не будет эффективным. Даже принятый ответ с Transaction не делает Bulk Insert,

Если вы хотите выполнить Bulk Insert, использовать SqlBulkCopy https://msdn.microsoft.com/en-us/library/system.data.sqlclient.sqlbulkcopy

Вы не найдете ничего быстрее, чем это.

Dapper Plus

Отказ от ответственности: я владелец проекта Dapper Plus

Этот проект не бесплатный, но предлагает все массовые операции:

  • BulkInsert
  • BulkUpdate
  • BulkDelete
  • BulkMerge

(Использовать под капотом SqlBulkCopy)

И еще несколько опций, таких как вывод значений идентичности:

// CONFIGURE & MAP entity
DapperPlusManager.Entity<Order>()
                 .Table("Orders")
                 .Identity(x => x.ID);

// CHAIN & SAVE entity
connection.BulkInsert(orders)
          .AlsoInsert(order => order.Items);
          .Include(x => x.ThenMerge(order => order.Invoice)
                         .AlsoMerge(invoice => invoice.Items))
          .AlsoMerge(x => x.ShippingAddress);   

Наша библиотека поддерживает несколько провайдеров:

  • SQL Server
  • SQL Compact
  • оракул
  • MySql
  • PostgreSQL
  • SQLite
  • жар-птица

Я нашел все эти примеры неполными.

Вот некоторый код, который правильно закрывает соединение после использования, а также правильно использует транзакцию для повышения производительности Excecute, основываясь на последних и лучших ответах в этой теме.

using (var scope = new TransactionScope()) 
{
    Connection.Open();
    Connection.Execute(sqlQuery, parameters);

    scope.Complete();
}

Самый быстрый вариант для меня:

            var dynamicParameters = new DynamicParameters();
            var selects = new List<string>();
            for (var i = 0; i < members.Length; i++)
            {
                var member = members[i];
                var pUsername = $"u{i}";
                var pIsActive = $"a{i}";
                dynamicParameters.Add(pUsername, member.Username);
                dynamicParameters.Add(pIsActive, member.IsActive);
                selects.Add("select @{pUsername},@{pIsActive}");
            }
            con.Execute($"insert into Member(Username, IsActive){string.Join(" union all ", selects)}", dynamicParameters);

которые генерируют sql как:

INSERT TABLENAME (Column1,Column2,...)
 SELECT @u0,@a0...
 UNION ALL
 SELECT @u1,@a1...
 UNION ALL
 SELECT @u2,@a2...

этот запрос работает быстрее, потому что sql добавляет набор строк вместо 1 строки за раз. Узким местом является не запись данных, а запись того, что вы делаете в журнале.

Также ознакомьтесь с правилами минимально зарегистрированных транзакций.

Другие вопросы по тегам