Распространение сообщений концентратора событий

Мы используем IoT Hub для приема данных устройства. В настоящее время мы используем 10 разделов для обработки сообщений. Поэтому при запуске 5 рабочих каждый экземпляр обрабатывает 2 раздела.

Мы обнаружили, что если устройство подключается и выгружает, скажем, 500 сообщений в концентратор, все эти сообщения будут передаваться из одного контекстного раздела, даже если другие не выполняют никакой работы.

Является ли единственная возможность спроектировать систему, использующую маршруты / конечные точки для достижения этой цели?

Мы используем MQTT на устройстве низкого уровня, поэтому изменение микропрограммы устройства в краткосрочной перспективе невозможно, но возможно в долгосрочной перспективе.

Я буду честен, я думал, что это подпитывало сообщения, в зависимости от того, что было наименьшим, даже круговая малиновка была бы лучше. Скорее всего, нам нужно будет подавать сообщения в очередь и обрабатывать их оттуда для достижения лучшего масштаба. В настоящее время создается несколько потоков для обработки каждого IEnumerable<EventData> messages за каждый раздел и его НАМНОГО лучше слава богу. Однако горлышко бутылки все еще будет существовать в той или иной форме, пока мы не перейдем к созданию дополнительной очереди и скорее масштабируемся там.

ОБНОВЛЕНИЕ - добавление некоторого примера кода, показывающего, что я делаю. Просто наполовину, теперь мы имеем 10-кратное улучшение производительности, обрабатывая каждый пакет сообщений с помощью нескольких задач. Я буду рефакторинг кода в нашем следующем выпуске, но пока это работает отлично.

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       await ProcessEventsBulk(context, messages);
    }

    async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
    {
        List<Task> TaskList = new List<Task>();
        foreach (EventData message in messages)
        {
            var LastTask = Task.Run(() => GoBoy(context, message));
            TaskList.Add(LastTask);
        }
        await Task.WhenAll(TaskList);
    }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        using (var db = new AppDbContext(_dbContextConnectionString))
        {
            await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
            await db.SaveChangesAsync();
        }
    }

Process Event сделает следующее:

  • Декодировать пакет
  • Вход в хранилище таблиц Azure через NLOG
  • Обновить значения в SQL (в частности, таблицу устройств)
  • Вставьте сообщение в DocumentDB
  • Переслать сообщение на 2 очереди
  • Ответить на блок

Я знаю, что мы можем разделить их на отдельных работников, но мне нравится, что это "в реальном времени" и безопаснее, поскольку устройство не очищает сообщение, пока мы не подтвердим.

Я создал ветку с некоторым ранним кодом, где я делаю пакетную вставку в DocDB до ACKing. Не видел существенного улучшения, но должен помочь с RU, который я предполагаю.

0 ответов

Другие вопросы по тегам