О стратегии контрольных точек в процессоре событий концентратора

Я использую узел процессора концентраторов событий для получения и обработки событий от концентраторов событий. Для повышения производительности я вызываю контрольную точку каждые 3 минуты, а не каждый раз при получении событий:

public async Task ProcessEventAsync(context, messages)
{
 foreach (var eventData in messages)
 {
    // do something
 }

 if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
 {
     await context.CheckpointAsync();
 }
}

Но проблема в том, что могут быть некоторые события, никогда не являющиеся контрольными точками, если не новые события, отправляемые в концентраторы событий, поскольку ProcessEventAsync не будет вызываться, если нет новых сообщений.

Любые предложения, чтобы убедиться, что все обработанные события являются контрольной точкой, но все же контрольной точкой каждые несколько минут?

Обновление: согласно предложению Срирама, я обновил код, как показано ниже:

public async Task ProcessEventAsync(context, messages)
{
    foreach (var eventData in messages)
    {
     // do something    
    }

    this.lastProcessedEventsCount += messages.Count();

    if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
    {
        this.checkpointStopWatch.Restart();
        if (this.lastProcessedEventsCount > 0)
        {
            await context.CheckpointAsync();
            this.lastProcessedEventsCount = 0;
        }
    }
}

1 ответ

Решение

Отличный случай - вы покрываете!

Вы можете испытать потерю event checkpoints (и в результате event replay) в следующих 2 случаях:

  1. когда у вас редкий поток данных (например, пакет сообщений каждые 5 минут и интервал проверки 3 минуты) и EventProcessorHost экземпляр почему-то закрывается - вы могли видеть 2 min из EventData - повторная обработка. Чтобы справиться с этим делом, следите за lastProcessedEvent после завершения IEventProcessor.onEvents/IEventProcessor.ProcessEventsAsync & контрольно-пропускной пункт, когда вы получите уведомление о закрытии - IEventProcessor.onClose/IEventProcessor.CloseAsync,

  2. Может быть просто случай, когда больше нет событий для конкретного EventHubs partition, В этом случае вы никогда не увидите последнее событие с контрольной точкой - с вашим Checkpointing strategy, Тем не менее, это редко, когда у вас есть непрерывный поток EventData и вы не отправляете на определенный раздел EventHubs (EventHubClient.send(EventData_Without_PartitionKey)). Если вы думаете - вы можете столкнуться с этой ситуацией, используйте:

    EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout (истина); // в java или EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // в C#

флаг, чтобы разбудить processEventsAsync очень часто Затем следите, LastProcessedEventData а также LastCheckpointedEventData и принять решение, стоит ли проходить контрольный пункт, когда нет Events получены на основе EventData.SequenceNumber собственность на тех событиях.

Другие вопросы по тегам