Как я могу вернуть поток в ответе WCF асинхронно от задачи?
У меня есть класс, который выполняет длительную обработку большого количества данных и записывает вывод в поток, который я предоставляю. Я пытаюсь использовать WCF (используя именованные каналы), но не могу понять, как вернуть поток. У меня пока что-то вроде этого:
interface IProcessor { Stream GetStream(); }
class Host {
static void Main(string[] args) {
using (ServiceHost sh = new ServiceHost(typeof(Processor), new Uri[]{new Uri("net.pipe://localhost")})) {
var binding = new NetNamedPipeBinding();
binding.TransferMode = TransferMode.StreamedResponse;
sh.AddServiceEndpoint(typeof(IProcessor), binding, "foo");
sh.Open();
Console.WriteLine("Waiting...");
Console.ReadLine();
sh.Close();
}
}
}
class Processor : IProcessor {
Stream GetStream() {
var SourceProcessor = new SourceProcessor(...);
var s = new MemoryStream();
new Task(() => { SourceProcessor.Run(s); }).Start();
return s;
}
}
class Client {
static void Main(string[] args) {
Console.WriteLine("Starting...");
var binding = new NetNamedPipeBinding();
binding.TransferMode = TransferMode.StreamedResponse;
ChannelFactory<IProcessor> f = new ChannelFactory<IProcessor>(binding, new EndpointAddress("net.pipe://localhost/foo"));
Console.WriteLine("Creating channel...");
IProcessor eps = f.CreateChannel();
Console.WriteLine("Getting stream.");
Stream s = eps.GetStream();
StreamReader sr = new StreamReader(s);
while (!sr.EndOfStream) Console.WriteLine(sr.ReadLine());
Console.ReadLine();
}
}
Все проходит через движения, но, конечно, ни одна из исходных данных не доходит до клиента. Я запутался в том, как я могу это сделать (возможно, я не могу), так как мне нужно и вернуть поток, и запустить задачу, и потенциально дождаться ее завершения. Если бы я просто вызывал SourceProcessor.Run (s), не находясь в задаче, якобы он блокировал бы и буферизовал, но я не уверен, как заставить его ждать, пока задача не будет выполнена, и в то же время возвращать поток для чтения клиентом....
1 ответ
Проблема в том, что WCF будет думать, что поток "готов", если он вызывает Read(
и вызов возвращает 0 байтов. MemoryStream
с удовольствием это сделает, он не будет блокировать чтение, если нет доступных данных.
Источник вашей проблемы WCF читает MemoryStream
быстрее, чем вы пишете и думаете, что это "сделано", способ исправить это - вам нужно будет вернуть другой тип Stream
это блокирует вместо возврата 0, когда нет доступных данных. В.NET нет ничего, что могло бы сделать это, вам нужно либо найти сторонний класс, либо создать свой собственный (это может быть так же просто, как получить из MemoryStream и переопределить Read
блокировать чтение, пока не будет установлен флаг "Готово" (см. BlockingCollection<T>
И его CompleteAdding()
метод для похожего поведения)).
Ради забавы я бросил это вместе, это абсолютно не проверено, но это может сделать то, что вам нужно.
using System;
using System.Collections.Concurrent;
using System.IO;
namespace Example
{
public class BufferStream : Stream
{
public BufferStream()
{
_data = new BlockingCollection<byte[]>();
}
/// <param name="boundedCapacity">The maximum number of calls to <see cref="Write"/> that can be made without
/// the buffer being drained.</param>
public BufferStream(int boundedCapacity)
{
_data = new BlockingCollection<byte[]>(boundedCapacity);
}
private readonly BlockingCollection<byte[]> _data;
private byte[] _currentBlock = null;
private int _currentBlockIndex = 0;
public int BoundedCapacity { get { return _data.BoundedCapacity; } }
public int BufferedWrites { get { return _data.Count; } }
public bool IsAddingCompleted
{
get { return _data.IsAddingCompleted; }
}
public bool IsCompleted
{
get { return _data.IsCompleted; }
}
public void CompleteAdding()
{
_data.CompleteAdding();
}
public override void Write(byte[] buffer, int offset, int count)
{
var localArray = new byte[count];
//Copy the data in to a new buffer of exactly the count size.
Array.Copy(buffer, offset, localArray, 0, count);
_data.Add(localArray);
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_currentBlock == null || _currentBlockIndex == _currentBlock.Length)
{
if (!GetNextBlock())
return 0;
}
int minCount = Math.Min(count, _currentBlock.Length - _currentBlockIndex);
Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset, minCount);
_currentBlockIndex += minCount;
return minCount;
}
/// <summary>
/// Loads the next block in to <see cref="_currentBlock"/>.
/// </summary>
/// <returns>True if the next block was retrieved.</returns>
private bool GetNextBlock()
{
if (!_data.TryTake(out _currentBlock))
{
//The TryTake failed, the collection is empty.
//See if we are in the completed state.
if (_data.IsCompleted)
{
return false;
}
//Wait for more data to show up.
try
{
_currentBlock = _data.Take();
}
catch (InvalidOperationException)
{
//If the blocking collection was marked complete while we where waiting Take throws a InvalidOperationException
return false;
}
}
_currentBlockIndex = 0;
return true;
}
#region Constant functions
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
#endregion
}
}
Преимущество этого по сравнению с MemoryStream
как только значение было прочитано WCF, ему больше не нужно оставаться в памяти (весь смысл возврата Stream
вместо byte[]
)