Опрос базы данных с помощью Reactive Extensions
Мне нужно своевременно запрашивать базу данных, чтобы узнать состояние устаревшей системы. Я думал об упаковке запроса вокруг Observable
, но я не знаю правильный способ сделать это.
По сути, это будет один и тот же запрос каждые 5 секунд. Но боюсь, мне придется столкнуться с этими проблемами:
- Что если выполнение запроса занимает 10 секунд? Я не хочу выполнять новый запрос, если предыдущий все еще обрабатывается.
- Также должен быть тайм-аут. Если текущий запрос не выполняется, например, через 20 секунд, необходимо зарегистрировать информативное сообщение и отправить новую попытку (тот же запрос).
Дополнительные детали:
- Запрос просто
SELECT
который возвращает набор данных со списком кодов состояния (рабочий, сбойный). - Наблюдаемая последовательность всегда будет принимать последние данные, полученные из запроса, что-то вроде метода расширения Switch.
- Я хотел бы обернуть запрос к базе данных (длинная операция) в задачу, но я не уверен, что это лучший вариант.
Я почти уверен, что запрос должен быть выполнен в другом потоке, но я понятия не имею, как должна выглядеть наблюдаемая, когда-либо читая " Введение в Rx" Ли Кэмпбелла.
2 ответа
Это довольно классический случай использования Rx для опроса другой системы. Большинство людей будут использовать Observable.Interval
как их оператор go-to, и для большинства это будет хорошо.
Однако у вас есть особые требования к тайм-аутам и повторите попытку. В этом случае я думаю, что вам лучше использовать комбинацию операторов:
Observable.Timer
чтобы позволить вам выполнить ваш запрос в указанное времяTimeout
для идентификации и запросов к базе данных, которые были переполненыToObservable()
составить карту вашегоTask
приводит к наблюдаемой последовательности.Retry
чтобы вы могли восстановиться после тайм-аутаRepeat
чтобы позволить вам продолжить после успешных запросов к базе данных. Это также сохранит этот начальный период / разрыв между завершением предыдущего запроса к базе данных и началом следующего.
Этот рабочий фрагмент LINQPad должен показать, что запрос работает правильно:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
Результаты выглядят так:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
Ключевая часть образца....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
РЕДАКТИРОВАТЬ: Вот дальнейшее объяснение того, как прийти к этому решению. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md
Я думаю, что это то, что вы должны сделать:
var query =
from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
from ds in Observable.Amb(
Observable.Start(() => /* Your DataSet query */),
Observable
.Timer(TimeSpan.FromSeconds(10.0))
.Select(_ => new DataSet("TimeOut")))
select ds;
Это инициирует новый запрос с интервалом между выполнениями 5 секунд. Это не 5 секунд с момента начала последнего, это 5 секунд с момента окончания последнего.
Тогда вы попробуете свой запрос, но вы .Amb
это с таймером, который возвращает специальный DataSet
через 10 секунд. Если ваш запрос заканчивается до истечения 10 секунд, он побеждает, но в противном случае специальный DataSet
возвращается .Amb
Оператор - это, по сути, оператор "гонки" - первый, наблюдаемый для получения значения, выигрывает.