Шаблон для вызова службы WCF с использованием async/await
Я сгенерировал прокси с операциями на основе задач.
Как правильно вызвать эту службу (избавиться от ServiceClient
и OperationContext
потом) используя async/await?
Моя первая попытка была:
public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
{
return await helper.Proxy.GetHomeInfoAsync(timestamp);
}
}
бытие ServiceHelper
класс, который создает ServiceClient
и OperationContextScope
и избавляется от них впоследствии:
try
{
if (_operationContextScope != null)
{
_operationContextScope.Dispose();
}
if (_serviceClient != null)
{
if (_serviceClient.State != CommunicationState.Faulted)
{
_serviceClient.Close();
}
else
{
_serviceClient.Abort();
}
}
}
catch (CommunicationException)
{
_serviceClient.Abort();
}
catch (TimeoutException)
{
_serviceClient.Abort();
}
catch (Exception)
{
_serviceClient.Abort();
throw;
}
finally
{
_operationContextScope = null;
_serviceClient = null;
}
Однако это с треском провалилось при одновременном вызове двух сервисов со следующей ошибкой: "Этот OperationContextScope расположен в другом потоке, чем он был создан".
MSDN говорит:
Не используйте асинхронный шаблон "ожидание" в блоке OperationContextScope. Когда происходит продолжение, оно может выполняться в другом потоке, а OperationContextScope зависит от конкретного потока. Если вам нужно вызвать "ожидание" для асинхронного вызова, используйте его за пределами блока OperationContextScope.
Вот в чем проблема! Но как мы можем исправить это правильно?
Этот парень сделал только то, что говорит MSDN:
private async void DoStuffWithDoc(string docId)
{
var doc = await GetDocumentAsync(docId);
if (doc.YadaYada)
{
// more code here
}
}
public Task<Document> GetDocumentAsync(string docId)
{
var docClient = CreateDocumentServiceClient();
using (new OperationContextScope(docClient.InnerChannel))
{
return docClient.GetDocumentAsync(docId);
}
}
Моя проблема с его кодом заключается в том, что он никогда не вызывает Close (или Abort) на ServiceClient.
Я также нашел способ распространения OperationContextScope
используя обычай SynchronizationContext
, Но, помимо того, что это много "рискованного" кода, он утверждает, что:
Стоит отметить, что у него есть несколько небольших проблем, связанных с удалением областей контекста операции (так как они позволяют вам размещать их только в вызывающем потоке), но это, по-видимому, не проблема, поскольку (по крайней мере, согласно разборки), они реализуют Dispose(), но не Finalize().
Итак, нам здесь не повезло? Существует ли проверенный шаблон для вызова служб WCF с использованием async / await и удаления ОБА ServiceClient
и OperationContextScope
? Может быть, кто-то из Microsoft (возможно, гуру Стивен Тауб:)) может помочь.
Спасибо!
[ОБНОВИТЬ]
С большой помощью от пользователя Noseratio я придумал что-то, что работает: не использовать OperationContextScope
, Если вы используете его по какой-либо из этих причин, попробуйте найти обходной путь, соответствующий вашему сценарию. В противном случае, если вам действительно нужно OperationContextScope
вам придется придумать реализацию SynchronizationContext
это захватывает это, и это кажется очень трудным (если это вообще возможно - должна быть причина, почему это не поведение по умолчанию).
Итак, полный рабочий код:
public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
{
return await helper.Proxy.GetHomeInfoAsync(timestamp);
}
}
С ServiceHelper
являются:
public class ServiceHelper<TServiceClient, TService> : IDisposable
where TServiceClient : ClientBase<TService>, new()
where TService : class
{
protected bool _isInitialized;
protected TServiceClient _serviceClient;
public TServiceClient Proxy
{
get
{
if (!_isInitialized)
{
Initialize();
_isInitialized = true;
}
else if (_serviceClient == null)
{
throw new ObjectDisposedException("ServiceHelper");
}
return _serviceClient;
}
}
protected virtual void Initialize()
{
_serviceClient = new TServiceClient();
}
// Implement IDisposable.
// Do not make this method virtual.
// A derived class should not be able to override this method.
public void Dispose()
{
Dispose(true);
// Take yourself off the Finalization queue
// to prevent finalization code for this object
// from executing a second time.
GC.SuppressFinalize(this);
}
// Dispose(bool disposing) executes in two distinct scenarios.
// If disposing equals true, the method has been called directly
// or indirectly by a user's code. Managed and unmanaged resources
// can be disposed.
// If disposing equals false, the method has been called by the
// runtime from inside the finalizer and you should not reference
// other objects. Only unmanaged resources can be disposed.
protected virtual void Dispose(bool disposing)
{
// If disposing equals true, dispose all managed
// and unmanaged resources.
if (disposing)
{
try
{
if (_serviceClient != null)
{
if (_serviceClient.State != CommunicationState.Faulted)
{
_serviceClient.Close();
}
else
{
_serviceClient.Abort();
}
}
}
catch (CommunicationException)
{
_serviceClient.Abort();
}
catch (TimeoutException)
{
_serviceClient.Abort();
}
catch (Exception)
{
_serviceClient.Abort();
throw;
}
finally
{
_serviceClient = null;
}
}
}
}
Обратите внимание, что класс поддерживает расширение; возможно, вам нужно унаследовать и предоставить учетные данные.
Единственная возможная "ошибка" заключается в том, что в GetHomeInfoAsync
Нельзя просто вернуть Task
вы получаете от прокси (что должно показаться естественным, зачем создавать новый Task
когда у вас уже есть). Ну, в этом случае вам нужно await
прокси Task
а затем закройте (или отмените) ServiceClient
иначе вы закроете его сразу после вызова сервиса (пока байты отправляются по проводам)!
Хорошо, у нас есть способ заставить это работать, но было бы неплохо получить ответ из авторитетного источника, как утверждает Носерацио.
7 ответов
Я думаю, что возможное решение может заключаться в использовании специального ожидателя для передачи нового контекста операции через OperationContext.Current
, Реализация OperationContext
само по себе не требует сродства потока. Вот образец:
async Task TestAsync()
{
using(var client = new WcfAPM.ServiceClient())
using (var scope = new FlowingOperationContextScope(client.InnerChannel))
{
await client.SomeMethodAsync(1).ContinueOnScope(scope);
await client.AnotherMethodAsync(2).ContinueOnScope(scope);
}
}
Вот реализация FlowingOperationContextScope
а также ContinueOnScope
(только слегка проверено):
public sealed class FlowingOperationContextScope : IDisposable
{
bool _inflight = false;
bool _disposed;
OperationContext _thisContext = null;
OperationContext _originalContext = null;
public FlowingOperationContextScope(IContextChannel channel):
this(new OperationContext(channel))
{
}
public FlowingOperationContextScope(OperationContext context)
{
_originalContext = OperationContext.Current;
OperationContext.Current = _thisContext = context;
}
public void Dispose()
{
if (!_disposed)
{
if (_inflight || OperationContext.Current != _thisContext)
throw new InvalidOperationException();
_disposed = true;
OperationContext.Current = _originalContext;
_thisContext = null;
_originalContext = null;
}
}
internal void BeforeAwait()
{
if (_inflight)
return;
_inflight = true;
// leave _thisContext as the current context
}
internal void AfterAwait()
{
if (!_inflight)
throw new InvalidOperationException();
_inflight = false;
// ignore the current context, restore _thisContext
OperationContext.Current = _thisContext;
}
}
// ContinueOnScope extension
public static class TaskExt
{
public static SimpleAwaiter<TResult> ContinueOnScope<TResult>(this Task<TResult> @this, FlowingOperationContextScope scope)
{
return new SimpleAwaiter<TResult>(@this, scope.BeforeAwait, scope.AfterAwait);
}
// awaiter
public class SimpleAwaiter<TResult> :
System.Runtime.CompilerServices.INotifyCompletion
{
readonly Task<TResult> _task;
readonly Action _beforeAwait;
readonly Action _afterAwait;
public SimpleAwaiter(Task<TResult> task, Action beforeAwait, Action afterAwait)
{
_task = task;
_beforeAwait = beforeAwait;
_afterAwait = afterAwait;
}
public SimpleAwaiter<TResult> GetAwaiter()
{
return this;
}
public bool IsCompleted
{
get
{
// don't do anything if the task completed synchronously
// (we're on the same thread)
if (_task.IsCompleted)
return true;
_beforeAwait();
return false;
}
}
public TResult GetResult()
{
return _task.Result;
}
// INotifyCompletion
public void OnCompleted(Action continuation)
{
_task.ContinueWith(task =>
{
_afterAwait();
continuation();
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
SynchronizationContext.Current != null ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
}
}
}
Простой способ - переместить ожидание за пределы блока использования.
public Task<Document> GetDocumentAsync(string docId)
{
var docClient = CreateDocumentServiceClient();
using (new OperationContextScope(docClient.InnerChannel))
{
var task = docClient.GetDocumentAsync(docId);
}
return await task;
}
Я решил написать свой собственный код, который поможет с этим, публикуя сообщения на случай, если это кому-нибудь поможет. Кажется, будет немного меньше ошибок (непредвиденные гонки и т. Д.) По сравнению с реализацией SimpleAwaiter, описанной выше, но вы будете судить:
public static class WithOperationContextTaskExtensions
{
public static ContinueOnOperationContextAwaiter<TResult> WithOperationContext<TResult>(this Task<TResult> @this, bool configureAwait = true)
{
return new ContinueOnOperationContextAwaiter<TResult>(@this, configureAwait);
}
public static ContinueOnOperationContextAwaiter WithOperationContext(this Task @this, bool configureAwait = true)
{
return new ContinueOnOperationContextAwaiter(@this, configureAwait);
}
public class ContinueOnOperationContextAwaiter : INotifyCompletion
{
private readonly ConfiguredTaskAwaitable.ConfiguredTaskAwaiter _awaiter;
private OperationContext _operationContext;
public ContinueOnOperationContextAwaiter(Task task, bool continueOnCapturedContext = true)
{
if (task == null) throw new ArgumentNullException("task");
_awaiter = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
}
public ContinueOnOperationContextAwaiter GetAwaiter() { return this; }
public bool IsCompleted { get { return _awaiter.IsCompleted; } }
public void OnCompleted(Action continuation)
{
_operationContext = OperationContext.Current;
_awaiter.OnCompleted(continuation);
}
public void GetResult()
{
OperationContext.Current = _operationContext;
_awaiter.GetResult();
}
}
public class ContinueOnOperationContextAwaiter<TResult> : INotifyCompletion
{
private readonly ConfiguredTaskAwaitable<TResult>.ConfiguredTaskAwaiter _awaiter;
private OperationContext _operationContext;
public ContinueOnOperationContextAwaiter(Task<TResult> task, bool continueOnCapturedContext = true)
{
if (task == null) throw new ArgumentNullException("task");
_awaiter = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
}
public ContinueOnOperationContextAwaiter<TResult> GetAwaiter() { return this; }
public bool IsCompleted { get { return _awaiter.IsCompleted; } }
public void OnCompleted(Action continuation)
{
_operationContext = OperationContext.Current;
_awaiter.OnCompleted(continuation);
}
public TResult GetResult()
{
OperationContext.Current = _operationContext;
return _awaiter.GetResult();
}
}
}
Использование (небольшое руководство и вложение не проверено...):
/// <summary>
/// Make a call to the service
/// </summary>
/// <param name="action"></param>
/// <param name="endpoint"> </param>
public async Task<ResultCallWrapper<TResult>> CallAsync<TResult>(Func<T, Task<TResult>> action, EndpointAddress endpoint)
{
using (ChannelLifetime<T> channelLifetime = new ChannelLifetime<T>(ConstructChannel(endpoint)))
{
// OperationContextScope doesn't work with async/await
var oldContext = OperationContext.Current;
OperationContext.Current = new OperationContext((IContextChannel)channelLifetime.Channel);
var result = await action(channelLifetime.Channel)
.WithOperationContext(configureAwait: false);
HttpResponseMessageProperty incomingMessageProperty = (HttpResponseMessageProperty)OperationContext.Current.IncomingMessageProperties[HttpResponseMessageProperty.Name];
string[] keys = incomingMessageProperty.Headers.AllKeys;
var headersOrig = keys.ToDictionary(t => t, t => incomingMessageProperty.Headers[t]);
OperationContext.Current = oldContext;
return new ResultCallWrapper<TResult>(result, new ReadOnlyDictionary<string, string>(headersOrig));
}
}
Асинхронный поток поддерживается из.Net 4.6.2.
У нас есть приложение ASP.Net WebApi, работающее на.Net 4.6, где мы использовали принятый ответ. TaskScheduler.FromCurrentSynchronizationContext()
вызвал проблемы взаимоблокировки, когда текущий контекст синхронизации AspNetSynchronizationContext
,
Я полагаю, что задача продолжения была поставлена в очередь после фактической задачи, в результате чего фактическая задача ожидает продолжения, тогда как задача продолжения должна выполняться для выполнения фактической задачи. то есть задачи ждут друг друга.
Поэтому я исправил проблему, изменив с помощью задачи продолжения задачу TaskAwaiter. См.: https://blogs.msdn.microsoft.com/lucian/2012/12/11/how-to-write-a-custom-awaiter/
Это было давно, но я добавлю свой собственный домашний раствор.
Если не против обойтись без OperationContextScope
, можно было бы рассмотреть что-то в этом роде:
Методы расширения
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;
namespace Intexx.ServiceModel
{
public static class WcfExtensions
{
[DebuggerStepThrough]
public static void Call<TChannel>(this TChannel Client, Action<TChannel> Method) where TChannel : ICommunicationObject
{
try
{
Method.Invoke(Client);
}
finally
{
Cleanup(Client);
}
}
[DebuggerStepThrough]
public static TResult Call<TChannel, TResult>(this TChannel Client, Func<TChannel, TResult> Method) where TChannel : ICommunicationObject
{
try
{
return Method.Invoke(Client);
}
finally
{
Cleanup(Client);
}
}
[DebuggerStepThrough]
public async static Task CallAsync<TChannel>(this TChannel Client, Func<TChannel, Task> Method) where TChannel : ICommunicationObject
{
try
{
await Method.Invoke(Client);
}
finally
{
Cleanup(Client);
}
}
[DebuggerStepThrough]
public async static Task<TResult> CallAsync<TChannel, TResult>(this TChannel Client, Func<TChannel, Task<TResult>> Method) where TChannel : ICommunicationObject
{
try
{
return await Method.Invoke(Client);
}
finally
{
Cleanup(Client);
}
}
private static void Cleanup<TChannel>(TChannel Client) where TChannel : ICommunicationObject
{
try
{
if (Client.IsNotNull)
{
if (Client.State == CommunicationState.Faulted)
Client.Abort();
else
Client.Close();
}
}
catch (Exception ex)
{
Client.Abort();
if (!ex is CommunicationException && !ex is TimeoutException)
throw new Exception(ex.Message, ex);
}
finally
{
Client = null;
}
}
}
}
Класс клиента
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;
namespace Reader
{
public class Client
{
public static CemReaderClient Create()
{
Tuple<Channels.Binding, EndpointAddress, double> oService;
try
{
oService = Main.Services(typeof(ICemReader));
return new CemReaderClient(oService.Item1, oService.Item2);
}
catch (KeyNotFoundException ex)
{
return null;
}
}
}
}
Использование (в VB, поскольку код не конвертируется)
Using oReader As Reader.CemReaderClient = Reader.Client.Create
If oReader.IsNotNothing Then
Dim lIsReading = Await oReader.CallAsync(Function(Reader As Reader.CemReaderClient)
Me.ConfigFilePath = If(Me.ConfigFilePath, Reader.GetConfigFilePath)
Me.BackupDrive = If(Me.BackupDrive, Reader.GetBackupDrive)
Me.SerialPort = If(Me.SerialPort, Reader.GetSerialPort)
Me.LogFolder = If(Me.LogFolder, Reader.GetLogFolder)
Return Reader.GetIsReadingAsync
End Function)
End If
End Using
У меня это надежно работало в производстве при частотных нагрузках около 15 вызовов в секунду на стороне клиента (это настолько быстро, насколько позволяет последовательная обработка). Однако это было для одного потока - это не было тщательно проверено на безопасность потоков. YMMV.
В моем случае я решил свернуть методы расширения в их собственный частный пакет NuGet. Вся конструкция оказалась очень удобной.
Это, конечно, придется переоценить, если OperationContextScope
когда-либо оказывается нужным.
Бит с Tuple
в Client
класс предназначен для поддержки Service Discovery. Если кто-то захочет также увидеть этот код, сообщите об этом, и я обновлю свой ответ.
Я немного запутался, я нашел этот блог: асинхронная операция на основе задач в WCF
Там это асинхронная связь wcf:
[ServiceContract]
public interface IMessage
{
[OperationContract]
Task<string> GetMessages(string msg);
}
public class MessageService : IMessage
{
async Task<string> IMessage.GetMessages(string msg)
{
var task = Task.Factory.StartNew(() =>
{
Thread.Sleep(10000);
return "Return from Server : " + msg;
});
return await task.ConfigureAwait(false);
}
}
Клиент:
var client = new Proxy("BasicHttpBinding_IMessage");
var task = Task.Factory.StartNew(() => client.GetMessages("Hello"));
var str = await task;
Так это тоже хороший способ?
Я не знаю, помогает ли это, но, увидев этот вопрос в моем поиске, чтобы ответить на тот же вопрос, я наткнулся на это.
Исходя из этого, я должен думать, что ваш код должен выглядеть примерно так:
public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
using (var client = CreateDocumentServiceClient())
{
await client.BeginGetHomeInfoAsync(timestamp);
}
}
Я понимаю, что мой ответ приходит довольно поздно:P, но это может помочь кому-то еще.
Я столкнулся с той же проблемой, однако до меня дошло, что мне вообще не нужно использовать async / await.
Поскольку вы не выполняете постобработку результата, вам не нужно ждать ответа. Если вам нужно обработать результат, просто используйте старомодное продолжение TPL.
public Task<MyDomainModel> GetHomeInfoAsync(DateTime timestamp)
{
using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
{
return helper.Proxy.GetHomeInfoAsync(timestamp).ContinueWith(antecedent=>processReplay(antecedent.Result));
}
}