Могу ли я преобразовать следующее в код TPL?
У меня есть следующий цикл, который уведомляет список наблюдателей определенного события:
foreach (var observer in registeredObservers)
{
if (observer != null)
{
observer.OnMessageRecieveEvent(new ObserverEvent(item));
}
}
Есть ли способ, с помощью которого я могу использовать TPL для одновременного уведомления всех зарегистрированных наблюдателей?
Вот код, который выполняется в OnMessageRecieveEvent()
public void OnMessageRecieveEvent(ObserverEvent e)
{
SendSignal(e.message.payload);
}
private void SendSignal(Byte[] signal)
{
if (state.WorkSocket.Connected)
{
try
{
// Sends async
state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback), state.WorkSocket);
}
catch (Exception e)
{
log.Error("Transmission Failier for ip: " + state.WorkSocket.AddressFamily , e);
}
}
else
{
CloseConnection();
}
}
Итак, мои вопросы:
Как я могу это сделать:
Я действительно хочу сделать это? Будет ли это полезно для производительности?
3 ответа
Ваш foreach
цикл записывается как Parallel.ForEach
, Примерно.
Parallel.ForEach(registeredObservers, (obs, item) =>
{
if (obs != null)
obs.OnMessageReceivedEvent(new ObserverEvent(item));
});
Поскольку единственная итерация вашего цикла состоит в том, чтобы запустить асинхронную операцию с сокетом (которая сама по себе очень быстра), вы, скорее всего, не выиграете от распараллеливания вашего кода.
Вы можете попробовать использовать TaskCompletionSource
или Task.FromAsync
способ конвертировать ваши SendSignal
метод к Task
возвращение одного. Затем вы можете просто создать список задач и дождаться результата после того, как выкинули все уведомления.
Код может выглядеть примерно так (непроверенный и не скомпилированный):
public async Task NotifyObservers()
{
List<Task> notifyTasks = new List<Task>();
foreach (var observer in registeredObservers)
{
if (observer != null)
{
notifyTasks.Add(observer.OnMessageRecieveEvent(new ObserverEvent(item)));
}
}
// asynchronously wait for all the tasks to complete
await Task.WhenAll(notifyTasks);
}
public async Task OnMessageRecieveEvent(ObserverEvent e)
{
await SendSignal(e.message.payload);
}
private Task SendSignal(Byte[] signal)
{
if (!state.WorkSocket.Connected)
{
CloseConnection();
return Task.FromResult<object>(null);
}
else
{
var tcs = new TaskCompletionSource<object>();
try
{
// Sends async
state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, (ar) =>
{
try
{
var socket = (Scoket)ar.AsyncState;
tcs.SetResult(socket.EndSend(ar));
}
catch(Exception ex)
{
tcs.SetException(ex);
}
}
, state.WorkSocket);
}
catch (Exception e)
{
log.Error("Transmission Failier for ip: " + state.WorkSocket.AddressFamily , e);
tcs.SetException(e);
}
}
return tcs.Task;
}