О стратегии контрольных точек в процессоре событий концентратора
Я использую узел процессора концентраторов событий для получения и обработки событий от концентраторов событий. Для повышения производительности я вызываю контрольную точку каждые 3 минуты, а не каждый раз при получении событий:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
await context.CheckpointAsync();
}
}
Но проблема в том, что могут быть некоторые события, никогда не являющиеся контрольными точками, если не новые события, отправляемые в концентраторы событий, поскольку ProcessEventAsync не будет вызываться, если нет новых сообщений.
Любые предложения, чтобы убедиться, что все обработанные события являются контрольной точкой, но все же контрольной точкой каждые несколько минут?
Обновление: согласно предложению Срирама, я обновил код, как показано ниже:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
this.lastProcessedEventsCount += messages.Count();
if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
this.checkpointStopWatch.Restart();
if (this.lastProcessedEventsCount > 0)
{
await context.CheckpointAsync();
this.lastProcessedEventsCount = 0;
}
}
}
1 ответ
Отличный случай - вы покрываете!
Вы можете испытать потерю event checkpoints
(и в результате event replay
) в следующих 2 случаях:
когда у вас редкий поток данных (например, пакет сообщений каждые 5 минут и интервал проверки 3 минуты) и
EventProcessorHost
экземпляр почему-то закрывается - вы могли видеть2 min
изEventData
- повторная обработка. Чтобы справиться с этим делом, следите заlastProcessedEvent
после завершенияIEventProcessor.onEvents
/IEventProcessor.ProcessEventsAsync
& контрольно-пропускной пункт, когда вы получите уведомление о закрытии -IEventProcessor.onClose
/IEventProcessor.CloseAsync
,Может быть просто случай, когда больше нет событий для конкретного
EventHubs partition
, В этом случае вы никогда не увидите последнее событие с контрольной точкой - с вашимCheckpointing strategy
, Тем не менее, это редко, когда у вас есть непрерывный потокEventData
и вы не отправляете на определенный раздел EventHubs (EventHubClient.send(EventData_Without_PartitionKey)
). Если вы думаете - вы можете столкнуться с этой ситуацией, используйте:EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout (истина); // в java или EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // в C#
флаг, чтобы разбудить processEventsAsync
очень часто Затем следите, LastProcessedEventData
а также LastCheckpointedEventData
и принять решение, стоит ли проходить контрольный пункт, когда нет Events
получены на основе EventData.SequenceNumber
собственность на тех событиях.