Хост обработчика событий не получает сообщения

У меня есть рабочая роль Azure с хостом процессора событий, подключенным к концентратору событий Azure. По неизвестной причине - он не получит никаких сообщений.

журналы показывают, что он открывает EventProcessor для каждого раздела - и нет ошибок - но ProcessEventsAsync никогда не называется.

с помощью Service Bus Explorer я вижу, что он получает сообщения, когда процессор не работает, а когда он включен, выдает исключение, что приемник включен.

  • Я однажды заставил его работать, но после перезагрузки он не продолжал работать

Я понятия не имею, где искать дальше - это, однако, код для рабочей роли

public class WorkerRole : RoleEntryPoint
{
    private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);

    private EventProcessorHost _eventProcessorHost;
    private IEventProcessorFactory _processorFactory;
    private ConfigurationProvider configuration = new ConfigurationProvider();
    private string _eventHubConnectionString;
    private string _storageAccountConnectionString;
    private string _dbConnectionString;

    public override void Run()
    {
        Trace.TraceInformation("EventHubWorker is running");


        try
        {
            RunAsync(_cancellationTokenSource.Token).Wait();
        }
        finally
        {
            _runCompleteEvent.Set();
        }
    }   

    public override bool OnStart()
    {
        Trace.TraceInformation("EventHubWorker is starting");
        CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance);
        // Set the maximum number of concurrent connections
        ServicePointManager.DefaultConnectionLimit = 12;
        SqlMapper.AddTypeHandler(new DateTimeHandler());
        _eventHubConnectionString = configuration.EventHubConnectionString;
        _dbConnectionString = configuration.DbConnectionString;
        _storageAccountConnectionString = configuration.StorageConnectionString;
        string hostName = Guid.NewGuid().ToString();
        var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName);

        _eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName,
            _eventHubConnectionString, _storageAccountConnectionString);

        var partitionOptions = new PartitionManagerOptions()
        {
            LeaseInterval = new TimeSpan(0, 5, 0)
        };
        _processorFactory = new EventProcessorFactory(/* some data for dependency injection */);

        return base.OnStart();
    }

    public override void OnStop()
    {
        Trace.TraceInformation("EventHubWorker is stopping");

        _cancellationTokenSource.Cancel();
        _runCompleteEvent.WaitOne();
        base.OnStop();

        Trace.TraceInformation("EventHubWorker has stopped");
    }

    private async Task RunAsync(CancellationToken cancellationToken)
    {
        int retryCount = 0;
        var exceptions = new List<Exception>();
        async Task StartProcessing()
        {
            if (retryCount > 5)
            {
                throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions);
            }
            try
            {
                await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions
                {
                    InitialOffsetProvider = o => DateTime.UtcNow,
                    MaxBatchSize = 100,
                    PrefetchCount = 10,
                    ReceiveTimeOut = TimeSpan.FromSeconds(20),
                });
            }
            catch(MessagingException e) when (e.IsTransient)
            {
                retryCount++;
                exceptions.Add(e);
                await StartProcessing();
            }
        }
        var options = new EventProcessorOptions();
        options.ExceptionReceived += Options_ExceptionReceived;

        await StartProcessing();

        cancellationToken.WaitHandle.WaitOne();
        await _eventProcessorHost.UnregisterEventProcessorAsync();
    }

    private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
    {
        Trace.TraceError(e.Exception.Message);
    }
}

Это код EventProcessor - сама фабрика кажется неактуальной

class EventProcessor : IEventProcessor
{
    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
                   //never logged
        Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed");
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
        else
        {
            Trace.TraceError(reason.ToString());
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
                    //always logs with the expected lease information
        Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}");
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        Trace.TraceInformation("processing event"); //never called
        // processing code
    }

1 ответ

Максимальный интервал аренды для PartitionManagerOptions составляет 60 секунд (аналогично аренде блобов). EventProcessorHost не будет генерировать исключения при первоначальном получении аренды. Попробуйте установить интервал аренды на 60 секунд вместо 5 минут.

Другие вопросы по тегам