Получение ваших данных из EventProcessorHost
Я очень новичок в использовании EventProcessorHost и IEventProcessor, и я пытаюсь выяснить, как получить мои данные из EventProcessorClass. В настоящее время у меня все работает и работает, если я просто хочу записывать новые сообщения на консоль.
Моя текущая реализация (и я даже не уверен, является ли она приемлемой или даже хорошей практикой) создает статическую переменную, а затем просто сохраняет в ней данные по мере их поступления, чтобы другой процессор мог их собрать. Это нормально, или есть лучший способ получить доступ к данным?
Это то, что у меня есть (механизм блокировки очень прост и будет исправлен, когда я получу остальную часть кода):
internal class Receiver
{
public static List<string> incommingMessagesList = new List<string>();
public static bool fIsDataListLocked = false;
private EventProcessorHost m_EPHClient;
...
Console.WriteLine( "Registering EventProcessor..." );
await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
}
public class SimpleEventProcessor : IEventProcessor
{
...
public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
{
foreach( var eventData in messages )
{
while( !Receiver.fIsDataListLocked )
{
Receiver.fIsDataListLocked = true ;
Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
Receiver.fIsDataListLocked = false ;
}
}
return context.CheckpointAsync();
}
}
ОБНОВЛЕНО:
По запросу немного больше информации:
По сути, я извлекаю данные из двух разных концов конвейера, чтобы проверить, проходят ли все сообщения и проследить их пропускную способность, один конец - концентратор событий, а другой - с сервера lwm2m в виде HTTP-запроса. Итак, у меня запущен процесс контроллера, который должен получать данные с обоих концов для очистки / анализа данных. Как я уже сказал, я новичок в обработчиках событий, но для меня не имело смысла иметь дескриптор EventProcessorHost, собирающий оба набора данных и затем очищающий / анализирующий их. Я могу определенно измениться, чтобы делать вещи таким образом, но это кажется неуклюжим.
1 ответ
В типичном сценарии обработчик событий получает и сохраняет данные самым быстрым способом. Несколько экземпляров Event Processor будут считывать данные из разных разделов EventHub.
В вашем случае вы хотите отправить данные в другое место и обработать их там в сочетании с другим потоком этих данных. Коллекция в памяти, такая как List, вероятно, не лучший способ сделать это:
- Это должно быть потокобезопасным
- При сбое данные будут потеряны
- Вам нужно будет вручную удалить обработанные данные, чтобы предотвратить постоянно растущую коллекцию
Вы будете нуждаться в некоторой реализации производителя / потребителя.
Возможное решение - записать оба потока данных в одно место назначения, например в очередь хранения Azure. Это имеет главное преимущество в том, что при сбое все данные сохраняются и не теряются. Ваш конечный процессор может читать со своей скоростью из очереди.