Реализация Rx.Net GroupBy отсутствует последовательность элементов редко

Я использую RX 2.2.5. У меня есть 2 просмотра, которые загружают суб-заказы с

           _transportService
            .ObserveSubOrder(parentOrder.OrderId)
            .SubscribeOn(_backgroundScheduler)
            .ObserveOn(_uiScheduler)
            .Where(subOs => subOs != null)                
            .Snoop("BeforeGrpBy")
            .GroupBy(subOs => subOs.OrderId)
            .Subscribe(subOrdUpdates =>
            {
                AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));                        
            })

Прежде чем groupBy получит всю последовательность элементов, после groupby возникает проблема, что он очень редко пропускает последовательность элементов. Я не думаю, что проблема параллелизма, как это видно из журналов. Пользовательский метод расширения Snoop используется для создания этих журналов.

16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})

Формат времени: (Тема): Msg

Как вы можете видеть ранее, groupby onNext вызывается дважды, но после пропускается один. Что-то не так с грамматикой Rx здесь или это известная проблема? Любое понимание поможет? Если какие-либо дополнительные разъяснения требуется, пожалуйста, прокомментируйте.

Обновление: Добавление рабочих / желательных логов:

16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})

Обновление 2: возможная ошибка или функция

GroupBy запускает groupedObservable, только если fireNewMapEntry имеет значение true ( GroupBy.cs), и это происходит здесь

 if (!_map.TryGetValue(key, out writer))
 {
    writer = new Subject<TElement>();
    _map.Add(key, writer);
    fireNewMapEntry = true;
  }

где _map имеет тип Dictionary<TKey, ISubject<TElement>>, Это может быть проблемой?

2 ответа

Просто некоторые заметки о вашем стиле кода (извините, это не совсем ответ, так как я думаю, что @supertopi ответил)

  1. Переместить вас SubscribeOn а также ObserveOn звонки, чтобы быть последним, что вы делаете перед окончательной подпиской. В текущем коде вы выполняете Where, Snoop и GroupBy все на _uiScheduler занимая драгоценные циклы.

  2. Избегайте подписки с подпиской. Похоже, что AddIfNew берет ключ и IObservable<T>Таким образом, я предполагаю, что он выполняет некоторую подписку внутри. Вместо этого опирайтесь на то, что вы знаете. Если вы используете GroupBy, то вы знаете, что ключ будет уникальным при первом получении группы. Так что теперь это может быть просто Add (если это ключ, который вы проверяете). Вы также можете использовать Take(1) если вы хотите быть явным. Если это значение, а не ключ, который вы проверяете, тогда GroupBy представляется излишним.

  3. Постарайтесь, чтобы имена переменных были согласованы, так как другой разработчик читает запрос, и он хорошо руководствуется, а не переключается между subOs, childOs а также childUpdates, когда childOrder кажется, лучшее имя (IMO)

  4. В идеале не возвращайте нулевые значения в вашей наблюдаемой последовательности. Какой цели это служит? Это может иметь смысл в некоторых редких случаях, но часто я нахожу, что нуль используется вместо OnCompleted чтобы указать, что нет никаких значений для этой последовательности.

например

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .GroupBy(childOrder => childOrder.OrderId)
        .SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(newGroup =>
        {
            Add(newGroup.Item1, newGroup.Item2);                        
        },
          ex=>//obviously we have error handling here ;-)
        );

или же

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(childOrder =>
          {
             AddIfNew(childOrder.OrderId, childOrder);                             
          },
          ex=>//obviously we have error handling here ;-)
        );

и даже лучше (без проверок и нулевых проверок)

var subscription = _transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(
          childOrder => AddIfNew(childOrder.OrderId, childOrder),
          ex=>//obviously we have error handling here ;-)
        );

НТН

Вы скучаете по природе GroupBy,

Оператор выбрасывает OnNext только после появления новой группы (см. реализацию GroupBy.cs: 67). В вашем случае orderID равно для обоих уведомлений, поэтому только один OnNext испускается

Значение, выбрасываемое оператором, имеет IGroupedObservable<T> на который вы можете подписаться, если вам нужен доступ к дальнейшим уведомлениям внутри группы.

Другие вопросы по тегам