Распространение сообщений концентратора событий
Мы используем IoT Hub для приема данных устройства. В настоящее время мы используем 10 разделов для обработки сообщений. Поэтому при запуске 5 рабочих каждый экземпляр обрабатывает 2 раздела.
Мы обнаружили, что если устройство подключается и выгружает, скажем, 500 сообщений в концентратор, все эти сообщения будут передаваться из одного контекстного раздела, даже если другие не выполняют никакой работы.
Является ли единственная возможность спроектировать систему, использующую маршруты / конечные точки для достижения этой цели?
Мы используем MQTT на устройстве низкого уровня, поэтому изменение микропрограммы устройства в краткосрочной перспективе невозможно, но возможно в долгосрочной перспективе.
Я буду честен, я думал, что это подпитывало сообщения, в зависимости от того, что было наименьшим, даже круговая малиновка была бы лучше. Скорее всего, нам нужно будет подавать сообщения в очередь и обрабатывать их оттуда для достижения лучшего масштаба. В настоящее время создается несколько потоков для обработки каждого IEnumerable<EventData> messages
за каждый раздел и его НАМНОГО лучше слава богу. Однако горлышко бутылки все еще будет существовать в той или иной форме, пока мы не перейдем к созданию дополнительной очереди и скорее масштабируемся там.
ОБНОВЛЕНИЕ - добавление некоторого примера кода, показывающего, что я делаю. Просто наполовину, теперь мы имеем 10-кратное улучшение производительности, обрабатывая каждый пакет сообщений с помощью нескольких задач. Я буду рефакторинг кода в нашем следующем выпуске, но пока это работает отлично.
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
await ProcessEventsBulk(context, messages);
}
async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
{
List<Task> TaskList = new List<Task>();
foreach (EventData message in messages)
{
var LastTask = Task.Run(() => GoBoy(context, message));
TaskList.Add(LastTask);
}
await Task.WhenAll(TaskList);
}
async Task GoBoy(PartitionContext context, EventData message)
{
using (var db = new AppDbContext(_dbContextConnectionString))
{
await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
await db.SaveChangesAsync();
}
}
Process Event сделает следующее:
- Декодировать пакет
- Вход в хранилище таблиц Azure через NLOG
- Обновить значения в SQL (в частности, таблицу устройств)
- Вставьте сообщение в DocumentDB
- Переслать сообщение на 2 очереди
- Ответить на блок
Я знаю, что мы можем разделить их на отдельных работников, но мне нравится, что это "в реальном времени" и безопаснее, поскольку устройство не очищает сообщение, пока мы не подтвердим.
Я создал ветку с некоторым ранним кодом, где я делаю пакетную вставку в DocDB до ACKing. Не видел существенного улучшения, но должен помочь с RU, который я предполагаю.