Опрос базы данных с помощью Reactive Extensions

Мне нужно своевременно запрашивать базу данных, чтобы узнать состояние устаревшей системы. Я думал об упаковке запроса вокруг Observable, но я не знаю правильный способ сделать это.

По сути, это будет один и тот же запрос каждые 5 секунд. Но боюсь, мне придется столкнуться с этими проблемами:

  • Что если выполнение запроса занимает 10 секунд? Я не хочу выполнять новый запрос, если предыдущий все еще обрабатывается.
  • Также должен быть тайм-аут. Если текущий запрос не выполняется, например, через 20 секунд, необходимо зарегистрировать информативное сообщение и отправить новую попытку (тот же запрос).

Дополнительные детали:

  • Запрос просто SELECT который возвращает набор данных со списком кодов состояния (рабочий, сбойный).
  • Наблюдаемая последовательность всегда будет принимать последние данные, полученные из запроса, что-то вроде метода расширения Switch.
  • Я хотел бы обернуть запрос к базе данных (длинная операция) в задачу, но я не уверен, что это лучший вариант.

Я почти уверен, что запрос должен быть выполнен в другом потоке, но я понятия не имею, как должна выглядеть наблюдаемая, когда-либо читая " Введение в Rx" Ли Кэмпбелла.

2 ответа

Решение

Это довольно классический случай использования Rx для опроса другой системы. Большинство людей будут использовать Observable.Interval как их оператор go-to, и для большинства это будет хорошо.

Однако у вас есть особые требования к тайм-аутам и повторите попытку. В этом случае я думаю, что вам лучше использовать комбинацию операторов:

  • Observable.Timer чтобы позволить вам выполнить ваш запрос в указанное время
  • Timeout для идентификации и запросов к базе данных, которые были переполнены
  • ToObservable() составить карту вашего Task приводит к наблюдаемой последовательности.
  • Retry чтобы вы могли восстановиться после тайм-аута
  • Repeat чтобы позволить вам продолжить после успешных запросов к базе данных. Это также сохранит этот начальный период / разрыв между завершением предыдущего запроса к базе данных и началом следующего.

Этот рабочий фрагмент LINQPad должен показать, что запрос работает правильно:

void Main()
{
    var pollingPeriod = TimeSpan.FromSeconds(5);
    var dbQueryTimeout = TimeSpan.FromSeconds(10);

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });

    var query = Observable.Timer(pollingPeriod, scheduler)
                    .SelectMany(_ => DatabaseQuery().ToObservable())
                    .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                    .Retry()    //Loop on errors
                    .Repeat();  //Loop on success

    query.StartWith("Seed")
        .TimeInterval(scheduler)    //Just to debug, print the timing gaps.
        .Dump();
}

// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
    //Oscillate the delay between 3 and 12 seconds
    delay += delayModifier;
    var timespan = TimeSpan.FromSeconds(delay);
    if (delay < 4 || delay > 11)
        delayModifier *= -1;
    timespan.Dump("delay");
    await Task.Delay(timespan);
    return "Value";
}

Результаты выглядят так:

Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606

Ключевая часть образца....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
                .SelectMany(_ => DatabaseQuery().ToObservable())
                .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                .Retry()    //Loop on errors
                .Repeat();  //Loop on success

РЕДАКТИРОВАТЬ: Вот дальнейшее объяснение того, как прийти к этому решению. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

Я думаю, что это то, что вы должны сделать:

var query =
    from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
    from ds in Observable.Amb(
        Observable.Start(() => /* Your DataSet query */),
        Observable
            .Timer(TimeSpan.FromSeconds(10.0))
            .Select(_ => new DataSet("TimeOut")))
    select ds;

Это инициирует новый запрос с интервалом между выполнениями 5 секунд. Это не 5 секунд с момента начала последнего, это 5 секунд с момента окончания последнего.

Тогда вы попробуете свой запрос, но вы .Amb это с таймером, который возвращает специальный DataSet через 10 секунд. Если ваш запрос заканчивается до истечения 10 секунд, он побеждает, но в противном случае специальный DataSet возвращается .Amb Оператор - это, по сути, оператор "гонки" - первый, наблюдаемый для получения значения, выигрывает.

Другие вопросы по тегам