Получение ваших данных из EventProcessorHost

Я очень новичок в использовании EventProcessorHost и IEventProcessor, и я пытаюсь выяснить, как получить мои данные из EventProcessorClass. В настоящее время у меня все работает и работает, если я просто хочу записывать новые сообщения на консоль.

Моя текущая реализация (и я даже не уверен, является ли она приемлемой или даже хорошей практикой) создает статическую переменную, а затем просто сохраняет в ней данные по мере их поступления, чтобы другой процессор мог их собрать. Это нормально, или есть лучший способ получить доступ к данным?

Это то, что у меня есть (механизм блокировки очень прост и будет исправлен, когда я получу остальную часть кода):

internal class Receiver
{
    public static List<string> incommingMessagesList = new List<string>();
    public static bool fIsDataListLocked = false;

    private EventProcessorHost m_EPHClient;

   ...

    Console.WriteLine( "Registering EventProcessor..." );
    await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
 }
 public class SimpleEventProcessor : IEventProcessor
 {
   ...

    public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
    {
        foreach( var eventData in messages )
        {
            while( !Receiver.fIsDataListLocked )
            {
                Receiver.fIsDataListLocked = true ;
                Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
                Receiver.fIsDataListLocked = false ;
            }

        }
        return context.CheckpointAsync();
    }
}

ОБНОВЛЕНО:

По запросу немного больше информации:

По сути, я извлекаю данные из двух разных концов конвейера, чтобы проверить, проходят ли все сообщения и проследить их пропускную способность, один конец - концентратор событий, а другой - с сервера lwm2m в виде HTTP-запроса. Итак, у меня запущен процесс контроллера, который должен получать данные с обоих концов для очистки / анализа данных. Как я уже сказал, я новичок в обработчиках событий, но для меня не имело смысла иметь дескриптор EventProcessorHost, собирающий оба набора данных и затем очищающий / анализирующий их. Я могу определенно измениться, чтобы делать вещи таким образом, но это кажется неуклюжим.

1 ответ

Решение

В типичном сценарии обработчик событий получает и сохраняет данные самым быстрым способом. Несколько экземпляров Event Processor будут считывать данные из разных разделов EventHub.

В вашем случае вы хотите отправить данные в другое место и обработать их там в сочетании с другим потоком этих данных. Коллекция в памяти, такая как List, вероятно, не лучший способ сделать это:

  • Это должно быть потокобезопасным
  • При сбое данные будут потеряны
  • Вам нужно будет вручную удалить обработанные данные, чтобы предотвратить постоянно растущую коллекцию

Вы будете нуждаться в некоторой реализации производителя / потребителя.

Возможное решение - записать оба потока данных в одно место назначения, например в очередь хранения Azure. Это имеет главное преимущество в том, что при сбое все данные сохраняются и не теряются. Ваш конечный процессор может читать со своей скоростью из очереди.

Другие вопросы по тегам