IEventProcessor обрабатывает сообщение два раза
Привет, я использую концентратор событий для приема данных с частотой 1 секунда.
Я постоянно отправляю смоделированные данные из консольного приложения в концентратор событий, а затем сохраняю их в базу данных SQL.
Прошло уже более 5 дней, и я каждый день несколько раз обнаруживал, что мой приемник обрабатывает данные два раза, поэтому я получил дубликаты записей в базу данных.
Так как это происходит только один или два раза в день, поэтому я даже не могу отследить.
Может ли кто-нибудь сталкивался с такой ситуацией до сих пор? Или возможно, что хост может обрабатывать одни и те же сообщения дважды? Или это проблема асинхронного поведения приемника?
Ждем помощи....
Фрагмент кода:
public class SimpleEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
// store data into SQL database / database call.
}
// Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(0))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
if (messages.Count() > 0)
await context.CheckpointAsync();
}
}
1 ответ
Event Hub гарантирует как минимум однократную доставку:
Он имеет следующие характеристики:
- низкая задержка
- способный принимать и обрабатывать миллионы событий в секунду
- хотя бы раз доставки
Таким образом, вы можете ожидать, что это произойдет.
Также примите во внимание ситуацию, когда контрольная точка только что произошла, затем обрабатывается еще несколько сообщений (давайте назовем их A и B), а затем процесс завершается неудачей. В следующий раз, когда процесс чтения снова запустится после того, как потребление сообщения об ошибке начнется с последнего сообщения с контрольной точкой, другими словами, сообщения A и B будут обработаны снова.