Возьмите последний предмет, переданный в Наблюдаемую (Последовательность)

У меня есть IObservable<Item> внутри класса, и я хочу предоставить доступное только для чтения свойство, которое предоставляет последний элемент, переданный наблюдаемому в данный момент времени. Так что это обеспечит единственное значение Item,

Если никакое значение не было выдвинуто, то оно должно будет вернуть значение по умолчанию.

Как я могу сделать это, не подписываясь на наблюдаемое и имея "поле поддержки"?

2 ответа

Решение

Просто чтобы немного дополнить ответ @ Асти и, возможно, помочь вам с вашим разочарованием:

Наблюдаемое - это не физическая "вещь", это скорее логическая концепция. Rx часто сравнивают с LINQ, и большую часть времени это справедливое сравнение. Однако, когда вы начинаете говорить о структурах данных, он ломается: перечислимые числа LINQ достаточно похожи на списки для целей обучения.

Однако на стороне Rx просто нет хорошего эквивалента List. Наблюдаемая - это временная структура данных, все операторы имеют дело с этим переходным состоянием. Если вы ищете постоянное состояние, вы покидаете Rx.

Сказав это, преобразование наблюдаемого в какое-то состояние является распространенной проблемой, и есть несколько пакетов, которые могут вам помочь: ReactiveUI, пожалуй, самый известный. ReactiveProperty это другое. Оба этих пакета имеют недостатки, но могут помочь вам.

Если вы просто ищете более простой способ получить защитное поле, без котельной, это будет работать:

public static class ReactivePropertyExtensions
{
    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
    {
        return new ReactiveProperty<T>(source);
    }

    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
    {
        return new ReactiveProperty<T>(source, defaultValue);
    }
}

public class ReactiveProperty<T> : IDisposable
{
    private IObservable<T> Source { get; }
    private IDisposable Subscription { get; }
    public T Value { get; private set; }

    public ReactiveProperty(IObservable<T> source)
        : this(source, default(T)) { }

    public ReactiveProperty(IObservable<T> source, T defaultValue)
    {
        Value = defaultValue;
        Source = source;
        Subscription = source.Subscribe(t => Value = t);
    }

    public void Dispose()
    {
        Subscription.Dispose();
    }
}

Пример использования:

var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish().RefCount();

var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);

Предполагая горячую наблюдаемость.

За observable = source.Replay(1); observable.Connect();

Укажите значение с помощью:

public int Value => observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();

Это вернет значение по умолчанию, если никакие значения не были переданы.

Вам нужен переход от Reactive к состоянию, поэтому поле поддержки не является ужасным вариантом. Вы упомянули, что не хотите подписываться, но что- то наблюдать: что-то, где-то должно подписаться.

Вот еще один способ определить Valueсобственности, в духе Асти решения.

private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;

public SomeClass() // Constructor
{
    _source = /* Initialize the source observable (hot) */

    _lastValue = _source
        .Catch(Observable.Never<Item>())
        .Concat(Observable.Never<Item>())
        .Multicast(new BehaviorSubject<Item>(default))
        .AutoConnect(0)
        .FirstAsync();
}

public Item Value => _lastValue.Wait();

В BehaviorSubject<T> специализированный ISubject<T> тот...

Представляет значение, которое изменяется со временем. Наблюдатели могут подписаться на тему, чтобы получать последнее (или начальное) значение и все последующие уведомления.

В Catch и Concat были добавлены операторы, чтобы сохранить последнее значение, даже в случае, если исходная последовательность завершается нормально или в исключительных случаях.

Лично я бы не решился использовать это решение, поскольку volatile поле обновлено в DoОператор сделает то же самое более естественно. Выкладываю в основном как демонстрацию возможностей Rx.

Другие вопросы по тегам