Как очистить буферы исходящих сообщений на Сервере?

Я написал сервис, используя PollingDuplexHttpBinding у которого есть клиент Silverllight, который потребляет его. В основном у моего Сервиса есть определенное количество клиентов, которые отправляют свои данные (довольно часто, каждую секунду, и данные достаточно велики, каждый вызов составляет около 5 КБ), а также прослушивают новые данные, отправленные другими клиентами в сервис. быть направленным на них, очень похоже на архитектуру чата.

Проблема, которую я замечаю, заключается в том, что когда клиенты подключаются к службе через Интернет, через несколько минут ответ службы замедляется, а ответы отстают. Я пришел к выводу, что при достижении пропускной способности хостов службы (скорость загрузки через Интернет, на сервере она составляет около 15 КБ / с), сообщения, отправляемые другими клиентами, буферизуются и обрабатываются соответствующим образом, когда есть доступная пропускная способность. Мне интересно, как именно я могу ограничить захват этого буфера, который служба использует для хранения полученных сообщений от клиентов? Не так важно, чтобы мои клиенты получали все данные, а скорее чтобы они получали последние данные, отправленные другими, поэтому подключение в реальном времени - это то, что я ищу за счет гарантированной доставки.

Короче говоря, я хочу иметь возможность очищать свою очередь / буфер в службе всякий раз, когда он заполняется, или когда достигается определенный предел, и начинать заполнять его снова полученными вызовами, чтобы избавиться от задержки. Как мне это сделать? Это MaxBufferSize свойство, что мне нужно уменьшить на стороне обслуживания, а также на стороне клиента? Или мне нужно кодировать эту функцию в моем сервисе? Есть идеи?

Благодарю.

РЕДАКТИРОВАТЬ:

Вот моя сервисная архитектура:

//the service
[ServiceContract(Namespace = "", CallbackContract = typeof(INewsNotification))]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class NewsService
{


private static Dictionary<IChatNotification, string> clients = new Dictionary<IChatNotification, string>();
private ReaderWriterLockSlim subscribersLock = new ReaderWriterLockSlim();

[OperationContract(IsOneWay = true)]
public void PublishNotifications(byte[] data)
{
            try
            {
                subscribersLock.EnterReadLock();
                List<INewsNotification> removeList = new List<INewsNotification>();
                lock (clients)
                {
                    foreach (var subscriber in clients)
                    {
                        if (OperationContext.Current.GetCallbackChannel<IChatNotification>() == subscriber.Key)
                        {
                            continue;
                        }
                        try
                        {
                            subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

                        }
                        catch (CommunicationObjectAbortedException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (CommunicationException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (ObjectDisposedException)
                        {
                            removeList.Add(subscriber.Key);
                        }

                    }
                }

                foreach (var item in removeList)
                {
                    clients.Remove(item);
                }
            }
            finally
            {
                subscribersLock.ExitReadLock();
            }
        }

}


//the callback contract
[ServiceContract]
public interface INewsNotification
{
       [OperationContract(IsOneWay = true, AsyncPattern = true)]
       IAsyncResult BeginOnNotificationSend(byte[] data, string username, AsyncCallback callback, object asyncState);
       void EndOnNotificationSend(IAsyncResult result);
}

Конфигурация сервиса:

  <system.serviceModel>
    <extensions>
      <bindingExtensions>
        <add name="pollingDuplex" type="System.ServiceModel.Configuration.PollingDuplexHttpBindingCollectionElement, System.ServiceModel.PollingDuplex, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />
      </bindingExtensions>
    </extensions>
    <behaviors>
      <serviceBehaviors>
        <behavior name="">

          <serviceMetadata httpGetEnabled="true" />
          <serviceThrottling maxConcurrentSessions="2147483647" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <bindings>
      <pollingDuplex>

        <binding name="myPollingDuplex" duplexMode="SingleMessagePerPoll" 
                 maxOutputDelay="00:00:00" inactivityTimeout="02:00:00" 
                 serverPollTimeout="00:55:00" sendTimeout="02:00:00"  openTimeout="02:00:00" 
                  maxBufferSize="10000"  maxReceivedMessageSize="10000" maxBufferPoolSize="1000"/>

      </pollingDuplex>
    </bindings>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="NewsNotificationService.Web.NewsService">
        <endpoint address="" binding="pollingDuplex" bindingConfiguration="myPollingDuplex" contract="NewsNotificationService.Web.NewsService" />
        <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
      </service>
    </services>
  </system.serviceModel>
    <system.webServer>
        <directoryBrowse enabled="true" />
    </system.webServer>
</configuration>

Клиент обычно вызывает услугу между периодами от 500 мс до 1000 мс, например:

_client.PublishNotificationAsync(byte[] data);

и обратный вызов уведомит клиента об уведомлениях, отправленных другими клиентами:

void client_NotifyNewsReceived(object sender, NewsServiceProxy.OnNewsSendReceivedEventArgs e)
        {
                e.Usernamer//WHich client published the data
                e.data//contents of the notification
        }

Напомним, что когда число клиентов увеличивается, а скорость загрузки хостов службы через Интернет ограничена, сообщения, отправляемые службой абонентам, буферизуются где-то и обрабатываются в очереди, что и вызывает проблему, Я не знаю, где находятся эти сообщения. В локальной сети служба работает нормально, поскольку скорость загрузки сервера равна скорости его загрузки (для входящих вызовов 100 КБ / с он отправляет 100 КБ / с уведомлений). Где эти сообщения буферизируются? И как я могу очистить этот буфер?

Я сделал что-то экспериментальное, чтобы проверить, буферизируются ли сообщения в службе, я пытался вызвать этот метод на клиенте, но он всегда возвращает 0, даже если один клиент все еще находится в процессе получения уведомлений, которые кто-то еще отправил. 5 минут назад:

[OperationContract(IsOneWay = false)]
public int GetQueuedMessages()
{

            return OperationContext.Current.OutgoingMessageHeaders.Count();
}

2 ответа

Я сделал немного математики для вашей ситуации.

  • размер сообщения = 100 КБ
  • канал загрузки = 15 КБ / с
  • Канал загрузки = 100 КБ / с
  • Клиент звонит в сервис 1-2 раза в секунду

клиент будет вызывать услугу обычно между 500 мс-1000 мс

Это правильно?

Для одного клиента ваш трафик загрузки только для сообщений будет 100-200 КБ / с, и это только тело сообщения. Там будет больше с заголовком и гораздо больше с включенной безопасностью.

Сообщения будут объединяться для асинхронного вызова. Таким образом, если у нас есть 3 клиента и каждый отправленный ответный вызов сообщения содержит 2 сообщения для каждого клиента. 4 клиента - 3 сообщения в каждом обратном вызове.

Для 3 клиентов это будет 200-400 КБ / с в канале загрузки.

Мне кажется, что сообщения слишком велики для заявленной вами пропускной способности.

Проверьте, можете ли вы:

  1. Уменьшите размер сообщения. Я не знаю природу вашего бизнеса, поэтому не могу дать совет здесь.

  2. Используйте сжатие для сообщений или трафика.

  3. Увеличьте пропускную способность сети. Без этого даже идеальное решение будет иметь очень высокую задержку. Вы можете потратить дни и недели на оптимизацию своего кода, и даже если вы правильно используете свое сетевое решение, процесс будет медленным.

Я знаю, это звучит как "Капитан Очевидность", но на некоторые вопросы просто нет правильного ответа, если вы не поменяете вопрос.

После выполнения вышеуказанных действий работайте с ServiceThrottlingBehavior в сочетании с пользовательским кодом, который будет управлять очередью обратного вызова.

ServiceThrottlingBehavior отклоняет запросы, если граница достигнута.

http://msdn.microsoft.com/en-us/library/ms735114(v=vs.100).aspx

Это тривиальный образец. Действительные числа должны быть определены специально для вашей среды.

<serviceThrottling 
 maxConcurrentCalls="1" 
 maxConcurrentSessions="1" 
 maxConcurrentInstances="1"
/>

Обновить:

Большая ошибка с моей стороны в вопросе, каждый вызов 5 КБ / с,

Даже с 5KB сообщениями 15KB/s недостаточно. Давайте посчитаем трафик только для 3 пользователей. 3 Входящее сообщение в секунду. Теперь система должна использовать дуплекс для уведомления пользователей о том, что отправили другие. Каждый пользователь должен получать сообщения от своих партнеров. Всего у нас 3 сообщения. Одно (которое принадлежит отправителю) может быть пропущено, поэтому каждый пользователь должен получить 2 сообщения. 3 пользователя получат 10 КБ (5 КБ + 5 КБ = одно сообщение 10 КБ) = 30 КБ. Одно сообщение в секунду составит 30 КБ / с в канале загрузки для 3 пользователей.

Где эти сообщения буферизируются? И как я могу очистить этот буфер?

Это зависит от того, как вы размещаете свой сервис. Если это самостоятельная служба, она вообще не буферизируется. У вас есть код, который пытается отправить сообщение получателю, но он идет очень медленно, потому что канал заполнен. В IIS может быть некоторая буферизация, но не рекомендуется прикасаться к ней извне.

Правильное решение дросселирует. С ограничением пропускной способности не следует пытаться отправлять все сообщения всем клиентам. Вместо этого вы должны, например, ограничить количество потоков, которые отправляют сообщения. Таким образом, из 3 вышеупомянутых пользователей вместо одновременной отправки 3 сообщений вы можете отправлять их последовательно или решить, что только один пользователь получит обновление в этом раунде, а следующий - в следующем.

Таким образом, общая идея заключается не в том, чтобы отправлять все всем, как только у вас есть данные, а в том, чтобы отправлять только тот объем данных, который вы можете себе позволить, и контролировать это с помощью подсчета потоков или скорости отклика.

MaxBufferSize не поможет вам с этой проблемой. Вам придется кодировать его самостоятельно. Я не знаю ни одного существующего решения / фреймворка. Однако это звучит как интересная проблема. Вы могли бы начать с поддержания Queue<Message> для каждого подключенного клиента и при отправке в эту очередь (или когда клиент вызывает dequeue) вы можете переоценить, что Message должно быть отправлено.

ОБНОВЛЕНИЕ: во-первых, я бы забыл о попытке сделать это со стороны клиента, и в конфигурации, вам придется кодировать это самостоятельно

Здесь я вижу, куда вы отправляете своим клиентам:

 subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

поэтому вместо асинхронной отправки этих уведомлений каждому клиенту вы должны поместить их в Queue<byte[]>, Каждое клиентское соединение будет иметь свою очередь, и вам, вероятно, следует создать класс, выделенный для каждого клиентского соединения:

обратите внимание, что этот код не будет компилироваться "из коробки" и, вероятно, имеет некоторые логические ошибки, используйте только в качестве руководства

public class ClientConnection
{
    private INewsNotification _callback;
    private Queue<byte> _queue = new Queue<byte>();
    private object _lock = new object();
    private bool _isSending
    public ClientConnection(INewsNotification callBack)
    {
           _callback=callback;
    }
    public void Enqueue(byte[] message)
    { 
       lock(_lock)
       {               
           //what happens here depends on what you want to do. 
           //Do you want to only send the latest message? 
           //if yes, you can just clear out the queue and put the new message in there,                         
           //or you could keep the most recent 5 messages.                
           if(_queue.Count > 0)
               _queue.Clear();
          _queue.Enqueue(message);
           if(!_isSending)
               BeginSendMessage();
       }
    }

    private void BeginSendMessage()
    {
       _isSending=true;
        _callback.BeginOnNotificationSend(_queue.Dequeue(), GetCurrentUser(), EndCallbackClient, subscriber.Key);

    }

    private void EndCallbackClient(IAsyncResult ar)
    {
        _callback.EndReceive(ar);
        lock(_lock)
        {
           _isSending=false;
           if(_queue.Count > 0)
              BeginSendMessage();//more messages to send
        }
    }
}

Представьте себе сценарий, когда одно сообщение отправляется клиенту, и при отправке еще 9 сообщений звоните ClientConnection.Enqueue, Когда первое сообщение закончено, оно проверяет очередь, которая должна содержать только последнее (9-е сообщение)

Другие вопросы по тегам