Как я могу объединить два потока, упорядоченных и сгруппированных по меткам времени?

У меня есть два потока объектов, каждый из которых имеет Timestamp значение. Оба потока расположены по порядку, поэтому, например, временные метки могут быть Ta = 1,3,6,6,7 в одном потоке и Тб = 1,2,5,5,6,8 в другом. Объекты в обоих потоках относятся к одному типу.

То, что я хотел бы сделать, это поместить каждое из этих событий в шину в порядке отметки времени, то есть, поставить A1, затем B1, B2, A3 и так далее. Кроме того, поскольку некоторые потоки имеют несколько (последовательных) элементов с одинаковой временной меткой, я хочу, чтобы эти элементы были сгруппированы так, чтобы каждое новое событие представляло собой массив. Таким образом, мы поместили бы [A3] в автобус, затем [A15, A25] и так далее.

Я попытался реализовать это, сделав два ConcurrentQueue структуры, помещая каждое событие в конец очереди, затем просматривая каждый фронт очереди, выбирая сначала более раннее событие, а затем пересекая очередь так, чтобы присутствовали все события с этой отметкой времени.

Однако я столкнулся с двумя проблемами:

  • Если я оставляю эти очереди неограниченными, я быстро исчерпываю память, поскольку операция чтения намного быстрее, чем обработчики, получающие события. (У меня есть несколько гигабайт данных).
  • Иногда я сталкиваюсь с ситуацией, когда я справляюсь с событием, скажем, A15 до прибытия A25. Мне как-то нужно остерегаться этого.

Я думаю, что Rx может помочь в этом отношении, но я не вижу очевидного комбинатора (ов), чтобы сделать это возможным. Таким образом, любой совет очень ценится.

1 ответ

Решение

Rx действительно хорошо подходит для этой проблемы IMO.

IObservables не могу 'OrderBy' по очевидным причинам (вам нужно сначала наблюдать весь поток, чтобы гарантировать правильный порядок вывода), поэтому мой ответ ниже предполагает (что вы заявили), что ваши 2 исходных потока событий находятся в порядке.

В конце концов, это была интересная проблема. Стандартные операторы Rx отсутствуют GroupByUntilChanged это решило бы это легко, если бы оно OnComplete на предыдущей группе наблюдается, когда наблюдается первый элемент следующей группы. Однако, глядя на реализацию DistinctUntilChanged это не следует этому шаблону и только звонки OnComplete когда источник, наблюдаемый завершается (даже если он знает, что после первого не различимого элемента больше не будет элементов... странно???). Во всяком случае, по этим причинам я решил против GroupByUntilChanged метод (чтобы не нарушать соглашения Rx) и пошел вместо ToEnumerableUntilChanged,

Отказ от ответственности: это мое первое расширение Rx, поэтому буду признателен за отзывы о моем выборе. Кроме того, одной из моих основных проблем является анонимное наблюдение distinctElements список.

Во-первых, код вашего приложения довольно прост:

    public class Event
    {
        public DateTime Timestamp { get; set; }
    }

    private IObservable<Event> eventStream1;
    private IObservable<Event> eventStream2; 

    public IObservable<IEnumerable<Event>> CombineAndGroup()
    {
        return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
            .ToEnumerableUntilChanged(e => e.Timestamp);
    }

Теперь для ToEnumerableUntilChanged реализация (предупреждение о коде):

    public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
    {
        // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
        var comparer = EqualityComparer<TKey>.Default;

        return Observable.Create<IEnumerable<TSource>>(observer =>
        {
            var currentKey = default(TKey);
            var hasCurrentKey = false;
            var distinctElements = new List<TSource>();

            return source.Subscribe((value =>
            {
                TKey elementKey;
                try
                {
                    elementKey = keySelector(value);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (!hasCurrentKey)
                {
                    hasCurrentKey = true;
                    currentKey = elementKey;
                    distinctElements.Add(value);
                    return;
                }

                bool keysMatch;
                try
                {
                    keysMatch = comparer.Equals(currentKey, elementKey);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (keysMatch)
                {
                    distinctElements.Add(value);
                    return;
                }

                observer.OnNext( distinctElements);

                distinctElements.Clear();
                distinctElements.Add(value);
                currentKey = elementKey;

            }), observer.OnError, () =>
            {
                if (distinctElements.Count > 0)
                    observer.OnNext(distinctElements);

                observer.OnCompleted();
            });
        });
    }
Другие вопросы по тегам