Как прочитать миллионы строк из текстового файла и быстро вставить в таблицу
Я быстро перешел по ссылке Вставить 2 миллиона строк в SQL Server и обнаружил, что могу сделать это с помощью групповой вставки. Поэтому я пытаюсь создать таблицу данных (код, как показано ниже), но так как это огромный файл (более 300K строк), я получаю OutOfMemoryEexception
в моем коде:
string line;
DataTable data = new DataTable();
string[] columns = null;
bool isInserted = false;
using (TextReader tr = new StreamReader(_fileName, Encoding.Default))
{
if (columns == null)
{
line = tr.ReadLine();
columns = line.Split(',');
}
for (int iColCount = 0; iColCount < columns.Count(); iColCount++)
{
data.Columns.Add("Column" + iColCount, typeof(string));
}
string[] columnVal;
while ((line = tr.ReadLine()) != null)
{
columnVal = line.Split(','); // OutOfMemoryException throwing in this line
data.Rows.Add(columnVal);
}
}
после долгой работы я изменил свой код, как показано ниже, но затем я также получаю OutOfMemoryException во время добавления строк в datatable
DataTable data = new DataTable();
string[] columns = null;
var line = string.Empty;
using (TextReader tr = new StreamReader(_fileName, Encoding.Default))
{
if (columns == null)
{
line = tr.ReadLine();
columns = line.Split(',');
}
for (int iColCount = 0; iColCount < columns.Count(); iColCount++)
{
data.Columns.Add("Column" + iColCount, typeof(string));
}
}
// Split the rows in 20000 rows in different list
var _fileList = File.ReadLines(_fileName, Encoding.Default).ToList();
var splitChunks = new List<List<string>>();
splitChunks = SplitFile(_fileList, 20000);
Parallel.ForEach(splitChunks, lstChunks =>
{
foreach (var rows in lstChunks)
{
string[] lineFields = rows.Split(',');
DataRow row = datatbl.NewRow();
for (int iCount = 0; iCount < lineFields.Count(); iCount++)
{
row[iCount] = lineFields[iCount] == string.Empty ? "" : lineFields[iCount].ToString();
}
datatbl.Rows.Add(row);
}
});
Я могу сделать массовую вставку для следующего уровня, как показано ниже:
SqlConnection SqlConnectionObj = GetSQLConnection();
SqlBulkCopy bulkCopy = new SqlBulkCopy(SqlConnectionObj, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
bulkCopy.DestinationTableName = "TempTable";
bulkCopy.WriteToServer(data);
Файл содержит ниже вид данных
4714,1370, AUSRICHTEN MASCHINELL
4870,1370, PLATTE STECKEN
0153,1900, CAULK GUN
0154,1900, НОВЫЙ ТЕРМИНАТОР
0360,1470, MU 186 MACCH. X LAV. S / A ASTE PS174
9113-H22,1970, MC БУРОВЫЕ СВЕРЛА
Код должен преобразовать это в 6 строк и 3 столбца.
Есть ли какой-нибудь более быстрый способ достижения вышеуказанной функциональности для чтения файла и создания таблицы данных для массовой вставки? Так что я не должен получить память из индекса исключения.
Заранее спасибо.
3 ответа
Причина, по которой вы получаете
OutOfMemoryException
потому что вы создаете таблицу данных в памяти и пытаетесь вставить в нее 300К строк
Это много данных для хранения в памяти.
Вместо этого вы должны делать каждое определенное количество строк, которые вы читаете из текстового файла - вам нужно вставить его в базу данных.
То, как вы это делаете, зависит от вас, вы можете использовать SQL или массовое копирование, но имейте в виду, что вы не можете прочитать весь текстовый файл и сохранить его в памяти, так что делайте это частями.
Решение с помощью SqlBulkCopy.WriteToServer и IDataReader. Я использую CSV, но я надеюсь, что это будет легко изменить для других типов. SqlBulkCopy
использует только 3 вещи из IDateReader
и мы должны их реализовать:
public int FieldCount {get; }
public bool Read()
public object GetValue(int i)
Все остальные свойства и методы могут быть не реализованы. Интересная статья о SqlBulkCopy. Полный код: https://dotnetfiddle.net/giG3Ai. Вот с урезанной версией:
namespace SqlBulkCopy
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Diagnostics;
using System.Data;
using System.Data.SqlClient;
public class CsvReader : IDataReader
{
private readonly char CSV_DELIMITER = ',';
private readonly StreamReader _sr;
private readonly Dictionary<string, Func<string, object>> _csv2SqlType;
private readonly string[] _headers;
private string _line;
private string[] _values;
public int FieldCount { get { return _headers.Length; } }
public CsvReader(string filePath, Dictionary<string, Func<string, object>> csvColumn2SqlTypeDict)
{
if (string.IsNullOrEmpty(filePath))
throw new ArgumentException("is null or empty", "filePath");
if (!System.IO.File.Exists(filePath))
throw new IOException(string.Format("{0} doesn't exist or access denied", filePath));
if (csvColumn2SqlTypeDict == null)
throw new ArgumentNullException("csvColumn2SqlTypeDict");
_sr = new StreamReader(filePath);
_csv2SqlType = csvColumn2SqlTypeDict;
_headers = ReadHeaders();
ValidateHeaders();
}
public object GetValue(int i)
{
// Get column value
var colValue = _values[i];
// Get column name
var colName = _headers[i];
// Try to convert to SQL type
try { return _csv2SqlType[colName](colValue); }
catch { return null; }
}
public bool Read()
{
if (_sr.EndOfStream) return false;
_line = _sr.ReadLine();
_values = _line.Split(CSV_DELIMITER);
// If row is invalid, go to next row
if (_values.Length != _headers.Length)
return Read();
return true;
}
public void Dispose()
{
_sr.Dispose();
}
private void ValidateHeaders()
{
if (_headers.Length != _csv2SqlType.Keys.Count)
throw new InvalidOperationException(string.Format("Read {0} columns, but csv2SqlTypeDict contains {1} columns", _headers.Length, _csv2SqlType.Keys));
foreach (var column in _headers)
{
if (!_csv2SqlType.ContainsKey(column))
throw new InvalidOperationException(string.Format("There is no convertor for column '{0}'", column));
}
}
private string[] ReadHeaders()
{
var headerLine = _sr.ReadLine();
if (string.IsNullOrEmpty(headerLine))
throw new InvalidDataException("There is no header in CSV!");
var headers = headerLine.Split(CSV_DELIMITER);
if (headers.Length == 0)
throw new InvalidDataException("There is no header in CSV after Split!");
return headers;
}
}
public class Program
{
public static void Main(string[] args)
{
// Converter from CSV columns to SQL columns
var csvColumn2SqlTypeDict = new Dictionary<string, Func<string, object>>
{
{ "int", (s) => Convert.ToInt32(s) },
{ "str", (s) => s },
{ "double", (s) => Convert.ToDouble(s) },
{ "date", (s) => Convert.ToDateTime(s) },
};
Stopwatch sw = Stopwatch.StartNew();
try
{
// example.csv
/***
int,str,double,date
1,abcd,2.5,15.04.2002
2,dab,2.7,15.04.2007
3,daqqb,4.7,14.04.2007
***/
using (var csvReader = new CsvReader("example.csv", csvColumn2SqlTypeDict))
{
// TODO!!! Modify to your Connection string
var cs = @"Server=localhost\SQLEXPRESS;initial catalog=TestDb;Integrated Security=true";
using (var loader = new SqlBulkCopy(cs, SqlBulkCopyOptions.Default))
{
// TODO Modify to your Destination table
loader.DestinationTableName = "Test";
// Write from csvReader to database
loader.WriteToServer(csvReader);
}
}
}
catch(Exception ex)
{
Console.WriteLine("Got an exception: {0}", ex);
Console.WriteLine("Press 'Enter' to quit");
Console.ReadLine();
return;
}
finally { sw.Stop(); }
Console.WriteLine("Data has been written in {0}", sw.Elapsed);
Console.WriteLine("Press 'Enter' to quit");
Console.ReadLine();
}
private static void ShowCsv(IDataReader dr)
{
int i = 0;
while (dr.Read())
{
Console.WriteLine("Row# {0}", i);
for (int j = 0; j < dr.FieldCount; j++)
{
Console.WriteLine("{0} => {1}", j, dr.GetValue(j));
}
i++;
}
}
}
}
Я обнаружил, что забыл DataTable и быстрее использовал простой старый SQLClient для каждой строки. Проще тоже. Это также превзошло функцию потокового SQL, которая, предположительно, была самым быстрым способом получения данных в БД SQL Server.
Попробуйте и измерьте скорость, посмотрите, достаточно ли она для вас. Если это не так, вы всегда можете попытаться переформатировать файл (при необходимости) и позволить SQL Server выполнить всю работу за вас, используя пакетную вставку.