Предоставление потока в качестве источника данных для двоичного столбца при использовании SqlBulkCopy
Если нужно читать данные из SqlServer в потоковом режиме, для этого есть некоторые возможности. Такие как использование SqlDataReader
с CommandBehavior.SequentialAccess
и, в частности, когда необходимо получить доступ к данным двоичного столбца, существует GetStream(int)
метод для этого:
var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";
using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
dr.Read();
var stream = dr.GetStream(0);
// access stream
}
Но как насчет потоковой передачи данных в обратном направлении, когда нужно передать данные в SqlServer, используя SqlBulkCopy
и особенно если поток должен быть предоставлен как источник данных для двоичного столбца?
Я пытался следовать
var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();
using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
sbc.DestinationTableName = "#Test";
sbc.EnableStreaming = true;
sbc.ColumnMappings.Add(0, "ID");
sbc.ColumnMappings.Add(1, "Data");
sbc.WriteToServer(new TestDataReader());
}
куда TestDataReader
инвентарь IDataReader
следующее:
class TestDataReader : IDataReader
{
public int FieldCount { get { return 2; } }
int rowCount = 1;
public bool Read() { return (rowCount++) < 3; }
public bool IsDBNull(int i) { return false; }
public object GetValue(int i)
{
switch (i)
{
case 0: return rowCount;
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
default: throw new Exception();
}
}
//the rest members of IDataReader
}
и это сработало, как ожидалось.
Однако меняется
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
в
case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });
вызванное исключение System.InvalidOperationException
с сообщением
Заданное значение типа MemoryStream из источника данных не может быть преобразовано в тип varbinary указанного целевого столбца.
Есть ли способ поставки Stream
от IDataReader
(или возможно DbDataReader
) чтобы SqlBulkCopy
как источник данных для двоичного столбца, не копируя все его данные в память (байтовый массив) в первую очередь?
2 ответа
Не уверен, что это где-то задокументировано, но если сделать краткий осмотр SqlBulkCopy
Исходный код вы можете обнаружить, что он обрабатывает различные читатели данных по-разному. Первый, SqlBulkCopy
поддерживает потоковую передачу и GetStream
, но вы можете заметить, что IDataReader
интерфейс не содержит GetStream
метод. Поэтому, когда вы кормите обычай IDataReader
внедрение в SqlBulkCopy
- он не будет обрабатывать двоичные столбцы как потоковые и не будет принимать значения Stream
тип.
С другой стороны - DbDataReader
действительно есть этот метод. Если вы кормите SqlBulkCopy
экземпляр DbDataReader
наследуемый класс - он будет обрабатывать все двоичные столбцы в потоковом режиме и будет вызывать DbDataReader.GetStream
,
Итак, чтобы исправить вашу проблему - наследовать от DbDataReader
как это:
class TestDataReader : DbDataReader
{
public override bool IsDBNull(int ordinal) {
return false;
}
public override int FieldCount { get; } = 2;
int rowCount = 1;
public override bool HasRows { get; } = true;
public override bool IsClosed { get; } = false;
public override bool Read()
{
return (rowCount++) < 3;
}
public override object GetValue(int ordinal) {
switch (ordinal) {
// do not return anything for binary column here - it will not be called
case 0:
return rowCount;
default:
throw new Exception();
}
}
public override Stream GetStream(int ordinal) {
// instead - return your stream here
if (ordinal == 1)
return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
throw new Exception();
}
// bunch of irrelevant stuff
}
Смотрите следующий код
static int SendOrders(int totalToSend)
{
using (SqlConnection con = new SqlConnection(connectionString))
{
con.Open();
using (SqlTransaction tran = con.BeginTransaction())
{
var newOrders =
from i in Enumerable.Range(0, totalToSend)
select new Order
{
customer_name = "Customer " + i % 100,
quantity = i % 9,
order_id = i,
order_entry_date = DateTime.Now
};
SqlBulkCopy bc = new SqlBulkCopy(con,
SqlBulkCopyOptions.CheckConstraints |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.KeepNulls, tran);
bc.BatchSize = 1000;
bc.DestinationTableName = "order_queue";
bc.WriteToServer(newOrders.AsDataReader());
tran.Commit();
}
con.Close();
}
return totalToSend;
}