Рабочая роль Azure High CPU

Так что это довольно широкий вопрос, но идеи исчерпаны. В настоящее время мы запускаем 2 экземпляра рабочих ролей, которые выполняют следующее:

  • Контролирует и обрабатывает события IoT Hub, порождая N потоков для каждого пакета.
  • Мониторинг и обработка сообщений Connect/Disconnect (мониторинг операций) от IoT Hub
  • Работает ли какая-либо служебная шина (темы и очереди)
  • Записывает в SQL, DocDB (Mongo API) и хранилище таблиц Azure для ведения журнала через NLOG
  • Отправляет облако на устройство через IoT Hub

Проблема, с которой мы сталкиваемся, заключается в том, что во время пиковых нагрузок наш процессор, очевидно, увеличивается, но, к сожалению, никогда не возвращается назад и часто стреляет до 100% и остается там до тех пор, пока я не перезапущу экземпляры, чтобы вернуть его обратно. Я продолжаю изучать темы, так как все еще чувствую, что это может быть связано со сценарием типа while(1), хотя не могу понять, почему. Давай войдем в код сейчас...

В WorkerRole.cs:

    class WorkerRole : RoleEntryPoint
    {
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

        public override void Run()
        {
            _eventprocessor.Start(instanceId, instanceIndex);//.Wait(-1);

            //Wait for shutdown to be called, else the role will recycle
            this.runCompleteEvent.WaitOne();
        }
    }

В EventProcessor.cs: я постараюсь опустить много сока, но добавлю то, что, по моему мнению, может быть достойным. По возможности добавим "псевдокод".

public class EventProcessor : IEventProcessor
{
  private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

  public async Task Start(string serviceId, int InstanceIndex)
  {

    //Setup Topic

    //Setup Queue

    //Setup EventProcessorHost for receiving events and operations monitoring and start listening

    //Receiving cloud to device feedback from service
    ReceiveFeedbackAsync();

    runCompleteEvent.WaitOne();
  }

  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
  {
        if (messages.Count() > 0)
        {
            if (!_cancellationSource.IsCancellationRequested)
            {
                await ProcessEventsBulk(context, messages);
            }
        }

        if (messages.Count() > 0)
        {
            await context.CheckpointAsync();               
        }
   }

  async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
        {
            List<Task> TaskList = new List<Task>();
            foreach (EventData message in messages)
            {
                var LastTask = Task.Run(() => GoBoy(context, message));
                TaskList.Add(LastTask);
            }
            await Task.WhenAll(TaskList);
        }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        try
        {
            using (var db = new AppDbContext(_dbContextConnectionString))
            {
                await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
                await db.SaveChangesAsync();
            }
        }
        catch (Exception e)
        {
           //Do Some stuff...
        }
    }

  private async void ReceiveFeedbackAsync()
    {
        var feedbackReceiver = serviceClientReceiver.GetFeedbackReceiver();
        while (true)
        {
            try
            {
              var feedbackBatch = await feedbackReceiver.ReceiveAsync();
              if (feedbackBatch == null) continue;
              foreach (var records in feedbackBatch.Records)
              {

              }
              await feedbackReceiver.CompleteAsync(feedbackBatch);
            }
            catch (Exception)
            {
              Thread.Sleep(30000);                    
            }
         }

    }

}

Пожалуйста, не стесняйтесь спрашивать, если вам нужно что-то еще. Я действительно очень ценю любую помощь.

Здесь показано падение процессора после перезапуска рабочих

Поддержка Microsoft помогла мне сделать несколько PerfViews и ProcDump. В результате мы должны были заглянуть в поток, называющий наш хаб " https://abcxyz.azure-devices.net/ $ iothub / websocket". Вот почему я решил добавить метод ReceiveFeedbackAsync(), так как я знаю, что он опирается на постоянное подключение к нашему хабу для сбора отзывов.

Из того, что я вижу, мы правильно регистрируемся на нашем EVPH, но дайте мне знать, если кто-то также захочет просмотреть этот код.

1 ответ

Вы прошли через код и убедились, что вы не создаете условие бесконечного цикла, которое не выдает никаких исключений, чтобы ваш Thead.Sleep выполнялся. Поскольку вы ожидаете, что в своем коде будет Sleep, лучше избегать использования исключения для его запуска. Возможно, закодируйте его в Sleep после обработки каждой партии отзывов. Исключение составляют обработка ошибок и исключительные обстоятельства, а не управление потоком логики.

Другие вопросы по тегам