Хост обработчика событий не получает сообщения
У меня есть рабочая роль Azure с хостом процессора событий, подключенным к концентратору событий Azure. По неизвестной причине - он не получит никаких сообщений.
журналы показывают, что он открывает EventProcessor
для каждого раздела - и нет ошибок - но ProcessEventsAsync
никогда не называется.
с помощью Service Bus Explorer я вижу, что он получает сообщения, когда процессор не работает, а когда он включен, выдает исключение, что приемник включен.
- Я однажды заставил его работать, но после перезагрузки он не продолжал работать
Я понятия не имею, где искать дальше - это, однако, код для рабочей роли
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost _eventProcessorHost;
private IEventProcessorFactory _processorFactory;
private ConfigurationProvider configuration = new ConfigurationProvider();
private string _eventHubConnectionString;
private string _storageAccountConnectionString;
private string _dbConnectionString;
public override void Run()
{
Trace.TraceInformation("EventHubWorker is running");
try
{
RunAsync(_cancellationTokenSource.Token).Wait();
}
finally
{
_runCompleteEvent.Set();
}
}
public override bool OnStart()
{
Trace.TraceInformation("EventHubWorker is starting");
CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance);
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
SqlMapper.AddTypeHandler(new DateTimeHandler());
_eventHubConnectionString = configuration.EventHubConnectionString;
_dbConnectionString = configuration.DbConnectionString;
_storageAccountConnectionString = configuration.StorageConnectionString;
string hostName = Guid.NewGuid().ToString();
var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName);
_eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName,
_eventHubConnectionString, _storageAccountConnectionString);
var partitionOptions = new PartitionManagerOptions()
{
LeaseInterval = new TimeSpan(0, 5, 0)
};
_processorFactory = new EventProcessorFactory(/* some data for dependency injection */);
return base.OnStart();
}
public override void OnStop()
{
Trace.TraceInformation("EventHubWorker is stopping");
_cancellationTokenSource.Cancel();
_runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("EventHubWorker has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
int retryCount = 0;
var exceptions = new List<Exception>();
async Task StartProcessing()
{
if (retryCount > 5)
{
throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions);
}
try
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions
{
InitialOffsetProvider = o => DateTime.UtcNow,
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
});
}
catch(MessagingException e) when (e.IsTransient)
{
retryCount++;
exceptions.Add(e);
await StartProcessing();
}
}
var options = new EventProcessorOptions();
options.ExceptionReceived += Options_ExceptionReceived;
await StartProcessing();
cancellationToken.WaitHandle.WaitOne();
await _eventProcessorHost.UnregisterEventProcessorAsync();
}
private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
Trace.TraceError(e.Exception.Message);
}
}
Это код EventProcessor - сама фабрика кажется неактуальной
class EventProcessor : IEventProcessor
{
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
//never logged
Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed");
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
else
{
Trace.TraceError(reason.ToString());
}
}
public Task OpenAsync(PartitionContext context)
{
//always logs with the expected lease information
Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}");
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
Trace.TraceInformation("processing event"); //never called
// processing code
}
1 ответ
Максимальный интервал аренды для PartitionManagerOptions составляет 60 секунд (аналогично аренде блобов). EventProcessorHost не будет генерировать исключения при первоначальном получении аренды. Попробуйте установить интервал аренды на 60 секунд вместо 5 минут.