Могу ли я преобразовать следующее в код TPL?

У меня есть следующий цикл, который уведомляет список наблюдателей определенного события:

 foreach (var observer in registeredObservers)
{
    if (observer != null)
    {
        observer.OnMessageRecieveEvent(new ObserverEvent(item));
    }
}

Есть ли способ, с помощью которого я могу использовать TPL для одновременного уведомления всех зарегистрированных наблюдателей?

Вот код, который выполняется в OnMessageRecieveEvent()

 public void OnMessageRecieveEvent(ObserverEvent e)
    {
        SendSignal(e.message.payload);
    }

 private void SendSignal(Byte[] signal)
    {
        if (state.WorkSocket.Connected)
        {
            try
            {
                // Sends async
                state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback), state.WorkSocket);
            }
            catch (Exception e)
            {                    
                log.Error("Transmission Failier for ip: " + state.WorkSocket.AddressFamily , e);
            }
        }
        else
        {
            CloseConnection();
        }
    }

Итак, мои вопросы:

  1. Как я могу это сделать:

  2. Я действительно хочу сделать это? Будет ли это полезно для производительности?

3 ответа

Решение

Ваш foreach цикл записывается как Parallel.ForEach, Примерно.

Parallel.ForEach(registeredObservers, (obs, item) =>
        {
            if (obs != null) 
               obs.OnMessageReceivedEvent(new ObserverEvent(item));
        });

Поскольку единственная итерация вашего цикла состоит в том, чтобы запустить асинхронную операцию с сокетом (которая сама по себе очень быстра), вы, скорее всего, не выиграете от распараллеливания вашего кода.

Вы можете попробовать использовать TaskCompletionSource или Task.FromAsync способ конвертировать ваши SendSignal метод к Task возвращение одного. Затем вы можете просто создать список задач и дождаться результата после того, как выкинули все уведомления.

Код может выглядеть примерно так (непроверенный и не скомпилированный):

public async Task NotifyObservers()
{
    List<Task> notifyTasks = new List<Task>();

    foreach (var observer in registeredObservers)
    {
        if (observer != null)
        {
            notifyTasks.Add(observer.OnMessageRecieveEvent(new ObserverEvent(item)));
        }
    }

    // asynchronously wait for all the tasks to complete
    await Task.WhenAll(notifyTasks);    
}

 public async Task OnMessageRecieveEvent(ObserverEvent e)
 {
    await SendSignal(e.message.payload);
 }

  private Task SendSignal(Byte[] signal)
  {
        if (!state.WorkSocket.Connected)
        {
            CloseConnection();
            return Task.FromResult<object>(null);
        }
        else
        {
            var tcs = new TaskCompletionSource<object>();

            try
            {
                // Sends async
                state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, (ar) =>
                {
                    try
                    {
                        var socket = (Scoket)ar.AsyncState;
                        tcs.SetResult(socket.EndSend(ar));
                    }
                    catch(Exception ex)
                    {
                        tcs.SetException(ex);
                    }

                }
                , state.WorkSocket);
            }
            catch (Exception e)
            {                    
                log.Error("Transmission Failier for ip: " + state.WorkSocket.AddressFamily , e);
                tcs.SetException(e);
            }
        }

       return tcs.Task;
    }
Другие вопросы по тегам