Как генерировать IObservable индекс дельта на ходу?
У меня есть два источника данных.
Один из них - это кэшированный список, а другой - новые данные, передаваемые через IObservable<T>
,
Я хочу использовать Rx, чтобы выяснить, какие операции необходимо выполнить с кэшированным списком A, чтобы сделать его идентичным по своему порядку и содержанию с новыми данными.
Я ищу функцию, которая принимает IEnumerable<T> a
а также IObservable<T> b
и возвращает наблюдаемое, которое помещает операции (вставки и удаления) в a
это сделало бы его идентичным b
не дожидаясь b
завершить.
Примечание: я знаю, что не могу изменить список или заметки. Я не хочу
Я только хочу знать, какие операции, в каком порядке превратят гипотетический список, идентичный по своему порядку и последовательности с A в список, идентичный по своему порядку и последовательности с B, как только эти операции станут известны.
И то и другое a
а также b
уникальны и отсортированы, T
инвентарь IComparable<T>
а также IEquatable<T>
,
public static IObservable<Tuple<int, bool>> IndexDelta<T>(
IEnumerable<T> a,
IObservable<T> b
) where T : IEquatable<T>, IComparable<T> {
// ???
}
я использую int
в моем примере.
Какие?!
Рассмотрим эти две последовательности:
A: [150, 100, 70, 30, 20]
B: [300, 200, 100, 70, 60, 50, 20]
Цель состоит в том, чтобы найти серию операций удаления / вставки, которые преобразуют A в B. Думаю, что A - это кэшированный источник данных, B - новые данные, я хочу знать, как преобразовать эти обновления в сетку без перезагрузки.
Ряды отсортированы в обоих источниках.
Я хочу, чтобы вывод был в форме
[(0, true), (1, true), (0, false), (3, false), (4, true), (5, true)]
Позже я бы сгруппировал эти операции по логическому флагу:
deleted: [0, 3]
inserted: [0, 1, 4, 5]
который будет переводить на человеческий язык как
Удалить A 0 и A 3:
A = [
150100, 70,30, 20] = [100, 70, 20]Вставьте B 0, B 1, B 4, B 5 в A:
A = [ 300, 200, 100, 70, 60, 50, 20]
Теперь А идентично В.
Требования
Есть несколько важных вещей, которые я хочу отметить:
A - это список, который гарантированно не изменится. B - холодная наблюдаемая, для завершения которой требуется некоторое время, но довольно скоро появляются первые предметы. Следовательно, наблюдаемый результат необходимо сдвинуть, как только будет доступно достаточно данных.
Предметы гарантированно будут уникальными с
IEquatable<T>
в обоих источниках.Элементы не являются изменяемыми и гарантированно сортируются по убыванию
IComparable<T>
в обоих источниках.Предпочтительно оптимизировать для новых элементов, добавляемых слева от B. Это самый распространенный сценарий. Однако возможно, что элементы будут удалены или вставлены в любом другом месте, учитывая, что их временная метка является подходящей (не нарушает сортировку). Подумайте, рулон камеры iPhone.
(*) Меня интересует чисто функциональное решение, если это возможно.
Эскиз псевдокода
Я набросал алгоритм псевдокода, который реализует это в обязательном порядке.
Я помирился Current
, MoveNext
, await
а также yield push
семантика, но идея должна иметь некоторый смысл.
IObservable<Tuple<int, bool>> IndexDelta(a, b)
{
var indexA = 0;
var indexB = 0;
while (true) {
var headA = a.Current;
var headB = b.Current;
if (headA == null && headB == null) {
return yield break; // both sequences are over
}
var reportDeletion = () => {
yield push Tuple.Create(indexA, false);
await a.MoveNext(); // this one is fast
}
var reportInsertion = () => {
yield push Tuple.Create(indexB, true);
await b.MoveNext(); // can take a long time
}
if (headA == null) { // No source item at this position
reportInsertion();
continue;
}
if (headB == null) { // No fetched item at this position
reportDeletion();
continue;
}
switch (headB.CompareTo(headA)) {
case 0:
yield continue;
break;
case 1: // Fetched item is newer than source item
reportInsertion();
break;
case -1: // Source item is newer than fetched item
reportDeletion();
break;
}
indexA++;
indexB++;
}
}
Я считаю, что вы могли бы реализовать что-то очень похожее с Subject<T>
, Однако я не хочу переходить к этому решению, потому что мне интересно, возможно ли решить его чисто путем создания функций Rx, таких как Aggregate
, Zip
или же CombineLatest
,
о чем ты думаешь?
2 ответа
Кажется, работает...
void Main()
{
var a = new int?[] {150, 100, 70, 30, 20 };
var b = new int?[] {300, 200, 100, 70, 60, 50, 20 };
var result = IndexDelta(a, b);
result.Dump();
}
// Define other methods and classes here
IObservable<Tuple<int, bool>> IndexDelta(IEnumerable<int?> a, IEnumerable<int?> b)
{
var observable = Observable.Create<Tuple<int, bool>>(o => {
var indexA = 0;
var indexB = 0;
var aEnumerator = a.GetEnumerator();
var bEnumerator = b.GetEnumerator();
var aHasNext = aEnumerator.MoveNext();
var bHasNext = bEnumerator.MoveNext();
while(true) {
if (aHasNext == false && bHasNext == false) {
"Completed".Dump();
o.OnCompleted(); // both sequences are over
break;
}
var headA = aEnumerator.Current;
var headB = bEnumerator.Current;
headA.Dump("A");
headB.Dump("B");
Action reportDeletion = () => {
o.OnNext(Tuple.Create(indexA, false));
aHasNext = aEnumerator.MoveNext(); // this one is fast
};
Action reportInsertion = () => {
o.OnNext(Tuple.Create(indexB, true));
bHasNext = bEnumerator.MoveNext(); // can take a long time
};
if (headA == null) { // No source item at this position
reportInsertion();
continue;
}
if (headB == null) { // No fetched item at this position
reportDeletion();
continue;
}
switch (headB.Value.CompareTo(headA.Value)) {
case 0:
aHasNext = aEnumerator.MoveNext();
bHasNext = bEnumerator.MoveNext();
indexA++;
indexB++;
break;
case 1: // Fetched item is newer than source item
reportInsertion();
indexB++;
break;
case -1: // Source item is newer than fetched item
reportDeletion();
indexA++;
break;
}
}
return Disposable.Empty;
});
return observable;
}
Этот код основан на ответе Ричарда, но работает с любым T
,
Я не смог избежать проклятия ToEnumerable
хотя - любая помощь приветствуется.
IObservable<Tuple<int, T, bool>> IndexDelta<T>(
IObservable<T> first, IObservable<T> second
)
where T : IComparable, IEquatable<T>
{
return Observable.Create<Tuple<int, T, bool>> (o => {
var a = first.ToEnumerable ().GetEnumerator ();
var b = second.ToEnumerable ().GetEnumerator ();
var indexA = -1;
var indexB = -1;
var hasNextA = true;
var hasNextB = true;
var headA = default(T);
var headB = default(T);
Action<bool> advanceA = (bool reportDeletion) => {
if (reportDeletion) {
o.OnNext (Tuple.Create (indexA, headA, false));
}
if (hasNextA = a.MoveNext ()) {
indexA++;
headA = a.Current;
}
};
Action<bool> advanceB = (bool reportInsertion) => {
if (reportInsertion) {
o.OnNext (Tuple.Create (indexB, headB, true));
}
if (hasNextB = b.MoveNext ()) {
indexB++;
headB = b.Current;
}
};
advanceA (false);
advanceB (false);
while (true) {
if (!hasNextA && !hasNextB) {
o.OnCompleted ();
break;
}
if (!hasNextA) {
advanceB (true);
continue;
}
if (!hasNextB) {
advanceA (true);
continue;
}
switch (headA.CompareTo (headB)) {
case 0:
advanceA (false);
advanceB (false);
break;
case 1:
advanceA (true);
break;
case -1:
advanceB (true);
break;
}
}
return Disposable.Create (() => {
a.Dispose ();
b.Dispose ();
});
});
}