Несколько долгосрочных реализаций IHostedSerivces или BackgroundService
Я не могу найти много по этому вопросу. Проблема, с которой я сталкиваюсь, заключается в том, что у меня есть необходимость запускать более 2-х запущенных фоновых служб, но выполняется только ExecuteAsync первой зарегистрированной службы. Я попытался реализовать его через BackgroundService и поместить код в ExecuteAsync, и я попытался реализовать IHostedService напрямую и поместить долго работающий код в StartAsync.
Я думаю, что проблема в возвращении Task.CompletedTask; никогда не называется. Например, у меня есть два потребителя Kafka, реализованные как BackgroundServices. Код выглядит одинаково как в отличие от темы, так и в методах OnMessage.
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var kafkaEndpoint = _kafkaConfig.Endpoint;
var kafkaTopic = "PhoenixEventStore";
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "consumer1" },
{ "bootstrap.servers", kafkaEndpoint },
{ "auto.offset.reset", "earliest" }
};
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (obj, msg) =>
{
Log.Information($"Consumer1 Received {msg.Value}");
};
consumer.OnPartitionEOF += (_, end) =>
{
Log.Information($"Consumer1 Reached end of topic {end.Topic} partition {end.Partition}.");
};
consumer.OnError += (_, error) =>
{
Log.Error($"Consumer1 Error: {error}");
};
consumer.Subscribe(new List<string>() { kafkaTopic });
while (!stoppingToken.IsCancellationRequested)
{
consumer.Poll(TimeSpan.FromSeconds(10));
}
//consider setting value that check whether the consumer has stopped polling.
}
return Task.CompletedTask;
}
Поскольку обе службы долго работают, Task.Complete никогда не срабатывает. Однако, если я закомментирую цикл while, обе службы ExecuteAsync будут задействованы вместо первой зарегистрированной.
Я нашел работу вокруг, которая, кажется, работает, но интересно, есть ли у кого-то еще лучший подход.
По сути, я выполняю рефакторинг кода, чтобы долго выполняемый код выполнялся в пустом методе StartConsumer, после чего мой ExecuteAsync выглядит следующим образом
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Task.Run(() => StartConsumer(stoppingToken));
return Task.CompletedTask;
}
Обе службы зарегистрированы с использованием
services.AddHostedService<MyHostedService1>
services.AddHostedService<MyHostedService2>
1 ответ
У меня такой же случай и все работает. Шаблон
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Foo()
}
catch (Exception ex)
{
_log.Error(ex.Message, exception: ex);
}
await Task.Delay(timeInMlSeconds);
}
}