Реализация Rx.Net GroupBy отсутствует последовательность элементов редко
Я использую RX 2.2.5. У меня есть 2 просмотра, которые загружают суб-заказы с
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Where(subOs => subOs != null)
.Snoop("BeforeGrpBy")
.GroupBy(subOs => subOs.OrderId)
.Subscribe(subOrdUpdates =>
{
AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));
})
Прежде чем groupBy получит всю последовательность элементов, после groupby возникает проблема, что он очень редко пропускает последовательность элементов. Я не думаю, что проблема параллелизма, как это видно из журналов. Пользовательский метод расширения Snoop используется для создания этих журналов.
16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
Формат времени: (Тема): Msg
Как вы можете видеть ранее, groupby onNext вызывается дважды, но после пропускается один. Что-то не так с грамматикой Rx здесь или это известная проблема? Любое понимание поможет? Если какие-либо дополнительные разъяснения требуется, пожалуйста, прокомментируйте.
Обновление: Добавление рабочих / желательных логов:
16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
Обновление 2: возможная ошибка или функция
GroupBy запускает groupedObservable, только если fireNewMapEntry имеет значение true ( GroupBy.cs), и это происходит здесь
if (!_map.TryGetValue(key, out writer))
{
writer = new Subject<TElement>();
_map.Add(key, writer);
fireNewMapEntry = true;
}
где _map имеет тип Dictionary<TKey, ISubject<TElement>>
, Это может быть проблемой?
2 ответа
Просто некоторые заметки о вашем стиле кода (извините, это не совсем ответ, так как я думаю, что @supertopi ответил)
Переместить вас
SubscribeOn
а такжеObserveOn
звонки, чтобы быть последним, что вы делаете перед окончательной подпиской. В текущем коде вы выполняетеWhere
,Snoop
иGroupBy
все на_uiScheduler
занимая драгоценные циклы.Избегайте подписки с подпиской. Похоже, что
AddIfNew
берет ключ иIObservable<T>
Таким образом, я предполагаю, что он выполняет некоторую подписку внутри. Вместо этого опирайтесь на то, что вы знаете. Если вы используете GroupBy, то вы знаете, что ключ будет уникальным при первом получении группы. Так что теперь это может быть просто Add (если это ключ, который вы проверяете). Вы также можете использоватьTake(1)
если вы хотите быть явным. Если это значение, а не ключ, который вы проверяете, тогдаGroupBy
представляется излишним.Постарайтесь, чтобы имена переменных были согласованы, так как другой разработчик читает запрос, и он хорошо руководствуется, а не переключается между
subOs
,childOs
а такжеchildUpdates
, когдаchildOrder
кажется, лучшее имя (IMO)В идеале не возвращайте нулевые значения в вашей наблюдаемой последовательности. Какой цели это служит? Это может иметь смысл в некоторых редких случаях, но часто я нахожу, что нуль используется вместо
OnCompleted
чтобы указать, что нет никаких значений для этой последовательности.
например
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.Where(childOrder => childOrder != null)
.Snoop("BeforeGrpBy")
.GroupBy(childOrder => childOrder.OrderId)
.SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(newGroup =>
{
Add(newGroup.Item1, newGroup.Item2);
},
ex=>//obviously we have error handling here ;-)
);
или же
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.Where(childOrder => childOrder != null)
.Snoop("BeforeGrpBy")
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(childOrder =>
{
AddIfNew(childOrder.OrderId, childOrder);
},
ex=>//obviously we have error handling here ;-)
);
и даже лучше (без проверок и нулевых проверок)
var subscription = _transportService
.ObserveSubOrder(parentOrder.OrderId)
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(
childOrder => AddIfNew(childOrder.OrderId, childOrder),
ex=>//obviously we have error handling here ;-)
);
НТН
Вы скучаете по природе GroupBy
,
Оператор выбрасывает OnNext
только после появления новой группы (см. реализацию GroupBy.cs: 67). В вашем случае orderID
равно для обоих уведомлений, поэтому только один OnNext
испускается
Значение, выбрасываемое оператором, имеет IGroupedObservable<T>
на который вы можете подписаться, если вам нужен доступ к дальнейшим уведомлениям внутри группы.