Как генерировать IObservable индекс дельта на ходу?

У меня есть два источника данных.
Один из них - это кэшированный список, а другой - новые данные, передаваемые через IObservable<T>,

Я хочу использовать Rx, чтобы выяснить, какие операции необходимо выполнить с кэшированным списком A, чтобы сделать его идентичным по своему порядку и содержанию с новыми данными.

Я ищу функцию, которая принимает IEnumerable<T> a а также IObservable<T> b и возвращает наблюдаемое, которое помещает операции (вставки и удаления) в a это сделало бы его идентичным b не дожидаясь b завершить.

Примечание: я знаю, что не могу изменить список или заметки. Я не хочу

Я только хочу знать, какие операции, в каком порядке превратят гипотетический список, идентичный по своему порядку и последовательности с A в список, идентичный по своему порядку и последовательности с B, как только эти операции станут известны.

И то и другое a а также b уникальны и отсортированы, T инвентарь IComparable<T> а также IEquatable<T>,

public static IObservable<Tuple<int, bool>> IndexDelta<T>(
    IEnumerable<T> a,
    IObservable<T> b
) where T : IEquatable<T>, IComparable<T> {
    // ???
}

я использую int в моем примере.

Какие?!

Рассмотрим эти две последовательности:

A: [150, 100, 70, 30, 20]
B: [300, 200, 100, 70, 60, 50, 20]

Цель состоит в том, чтобы найти серию операций удаления / вставки, которые преобразуют A в B. Думаю, что A - это кэшированный источник данных, B - новые данные, я хочу знать, как преобразовать эти обновления в сетку без перезагрузки.

Ряды отсортированы в обоих источниках.

Я хочу, чтобы вывод был в форме

[(0, true), (1, true), (0, false), (3, false), (4, true), (5, true)]

Позже я бы сгруппировал эти операции по логическому флагу:

deleted:  [0, 3]
inserted: [0, 1, 4, 5]

который будет переводить на человеческий язык как

  1. Удалить A 0 и A 3:

    A = [ 150 100, 70, 30 , 20] = [100, 70, 20]

  2. Вставьте B 0, B 1, B 4, B 5 в A:

    A = [ 300, 200, 100, 70, 60, 50, 20]

  3. Теперь А идентично В.

Требования

Есть несколько важных вещей, которые я хочу отметить:

  1. A - это список, который гарантированно не изменится. B - холодная наблюдаемая, для завершения которой требуется некоторое время, но довольно скоро появляются первые предметы. Следовательно, наблюдаемый результат необходимо сдвинуть, как только будет доступно достаточно данных.

  2. Предметы гарантированно будут уникальными с IEquatable<T> в обоих источниках.

  3. Элементы не являются изменяемыми и гарантированно сортируются по убыванию IComparable<T> в обоих источниках.

  4. Предпочтительно оптимизировать для новых элементов, добавляемых слева от B. Это самый распространенный сценарий. Однако возможно, что элементы будут удалены или вставлены в любом другом месте, учитывая, что их временная метка является подходящей (не нарушает сортировку). Подумайте, рулон камеры iPhone.

  5. (*) Меня интересует чисто функциональное решение, если это возможно.

Эскиз псевдокода

Я набросал алгоритм псевдокода, который реализует это в обязательном порядке.

Я помирился Current, MoveNext, await а также yield push семантика, но идея должна иметь некоторый смысл.

IObservable<Tuple<int, bool>> IndexDelta(a, b)
{
    var indexA = 0;
    var indexB = 0;

    while (true) {
        var headA = a.Current;
        var headB = b.Current; 

        if (headA == null && headB == null) {
            return yield break; // both sequences are over
        }

        var reportDeletion = () => {
            yield push Tuple.Create(indexA, false);
            await a.MoveNext(); // this one is fast
        }

        var reportInsertion = () => {
            yield push Tuple.Create(indexB, true);
            await b.MoveNext(); // can take a long time
        }

        if (headA == null) { // No source item at this position
            reportInsertion();
            continue;
        }

        if (headB == null) { // No fetched item at this position
            reportDeletion();
            continue;
        }

        switch (headB.CompareTo(headA)) {
            case 0:
                yield continue;
                break;
            case 1: // Fetched item is newer than source item
                reportInsertion();
                break; 
            case -1: // Source item is newer than fetched item
                reportDeletion();
                break; 
        }

        indexA++;
        indexB++;
    }
} 

Я считаю, что вы могли бы реализовать что-то очень похожее с Subject<T>, Однако я не хочу переходить к этому решению, потому что мне интересно, возможно ли решить его чисто путем создания функций Rx, таких как Aggregate, Zip или же CombineLatest,

о чем ты думаешь?

2 ответа

Решение

Кажется, работает...

void Main()
{
    var a = new int?[] {150, 100, 70, 30, 20 };
    var b = new int?[] {300, 200, 100, 70, 60, 50, 20 };
    var result = IndexDelta(a, b);
    result.Dump();
}

// Define other methods and classes here
IObservable<Tuple<int, bool>> IndexDelta(IEnumerable<int?> a, IEnumerable<int?> b)
{
    var observable = Observable.Create<Tuple<int, bool>>(o => {
        var indexA = 0;
        var indexB = 0;
        var aEnumerator = a.GetEnumerator();
        var bEnumerator = b.GetEnumerator();
        var aHasNext = aEnumerator.MoveNext();
        var bHasNext = bEnumerator.MoveNext();

        while(true) {
            if (aHasNext == false && bHasNext == false) {
                "Completed".Dump();
                o.OnCompleted(); // both sequences are over
                break;
            }

            var headA = aEnumerator.Current;
            var headB = bEnumerator.Current; 

            headA.Dump("A");
            headB.Dump("B");

            Action reportDeletion = () => {
                o.OnNext(Tuple.Create(indexA, false));
                aHasNext = aEnumerator.MoveNext(); // this one is fast
            };
            Action reportInsertion = () => {
                o.OnNext(Tuple.Create(indexB, true));
                bHasNext = bEnumerator.MoveNext(); // can take a long time
            };

            if (headA == null) { // No source item at this position
                reportInsertion();
                continue;
            }

            if (headB == null) { // No fetched item at this position
                reportDeletion();
                continue;
            }   

            switch (headB.Value.CompareTo(headA.Value)) {
                case 0:     
                    aHasNext = aEnumerator.MoveNext();
                    bHasNext = bEnumerator.MoveNext();
                    indexA++;
                    indexB++;
                    break;
                case 1: // Fetched item is newer than source item
                    reportInsertion();
                    indexB++;
                    break; 
                case -1: // Source item is newer than fetched item
                    reportDeletion();
                    indexA++;
                    break; 
            }           
        }
        return Disposable.Empty;
    });     
    return observable;
} 

Этот код основан на ответе Ричарда, но работает с любым T,
Я не смог избежать проклятия ToEnumerable хотя - любая помощь приветствуется.

IObservable<Tuple<int, T, bool>> IndexDelta<T>(
    IObservable<T> first, IObservable<T> second
)
    where T : IComparable, IEquatable<T>
{
    return Observable.Create<Tuple<int, T, bool>> (o => {
        var a = first.ToEnumerable ().GetEnumerator ();
        var b = second.ToEnumerable ().GetEnumerator ();

        var indexA = -1;
        var indexB = -1;

        var hasNextA = true;
        var hasNextB = true;

        var headA = default(T);
        var headB = default(T);

        Action<bool> advanceA = (bool reportDeletion) => {
            if (reportDeletion) {
                o.OnNext (Tuple.Create (indexA, headA, false));
            }

            if (hasNextA = a.MoveNext ()) {
                indexA++;
                headA = a.Current;
            }
        };

        Action<bool> advanceB = (bool reportInsertion) => {
            if (reportInsertion) {
                o.OnNext (Tuple.Create (indexB, headB, true));
            }

            if (hasNextB = b.MoveNext ()) {
                indexB++;
                headB = b.Current;
            }
        };

        advanceA (false);
        advanceB (false);

        while (true) {
            if (!hasNextA && !hasNextB) {
                o.OnCompleted ();
                break;
            }

            if (!hasNextA) {
                advanceB (true);
                continue;
            } 

            if (!hasNextB) {
                advanceA (true);
                continue;
            } 

            switch (headA.CompareTo (headB)) {
            case 0:
                advanceA (false);
                advanceB (false);
                break;
            case 1:
                advanceA (true);
                break; 
            case -1:
                advanceB (true);
                break; 
            }          
        }

        return Disposable.Create (() => {
            a.Dispose ();
            b.Dispose ();
        });
    });     
} 
Другие вопросы по тегам