Быстрый способ оценить количество элементов выше заданного порога? Вероятностная структура данных?

У меня есть большой список значений, составленный в диапазоне от 0 до 100 000 (для ясности представлен здесь как буквы). В каждом входе может быть несколько тысяч элементов.

[a a a a b b b b c f d b c f ... ]

Я хочу найти количество чисел с количеством, превышающим определенный порог. Например, если порог равен 3, ответ {a: 4, b: 5},

Очевидный способ сделать это - сгруппировать по идентификаторам, сосчитать каждую группировку и затем отфильтровать.

Это вопрос, не зависящий от языка, но в Clojure (не откладывайте, если вы не знаете Clojure!):

(filter (fn [[k cnt]] (> cnt threshold)) (frequencies input))

Эта функция работает с очень большим количеством входов, каждый вход очень большой, поэтому группировка и фильтрация являются дорогостоящей операцией. Я хочу найти некоторую защитную функцию, которая будет возвращаться раньше, если вход никогда не выдаст никаких выходов сверх заданного порога или иным образом разделит проблемное пространство. Например, наиболее упрощенным является if the size of the input is less than the size of the threshold return nil,

Я ищу лучшую защитную функцию, которая пропустит вычисление, если входные данные не могут дать никаких выходных данных. Или более быстрый способ получения результата.

Очевидно, что это должно быть дешевле, чем сама группировка. Одно отличное решение заключалось в подсчете входных данных по отдельному набору входных данных, но это оказалось таким же дорогим, как группировка...

У меня есть идея, что вероятностные структуры данных могут содержать ключ. Есть идеи?

(Я пометил Hyerloglog, хотя я не думаю, что это применимо, потому что это не обеспечивает счет)

3 ответа

Вы можете посмотреть на Рассказчик. Он предназначен для "анализа и агрегирования потоков данных".

Просто query-seq сделать то, что вы изначально хотите, это:

(require '[narrator.query :refer [query-seq query-stream]])
(require '[narrator.operators :as n])

(def my-seq [:a :a :b :b :b :b :c :a :b :c])
(query-seq (n/group-by identity n/rate) my-seq)
==> {:a 3, :b 5, :c 2}

Который вы можете отфильтровать, как вы предложили.

Ты можешь использовать quasi-cardinality чтобы быстро определить количество уникальных элементов в вашем образце (и, следовательно, вопрос о вашем разделе). Для этого он использует алгоритм оценки мощности HyperLogLog, например:

(query-seq (n/quasi-cardinality) my-seq)
==> 3

quasi-frequency-by продемонстрировано здесь:

(defn freq-in-seq
  "returns a function that, when given a value, returns the frequency of that value in the sequence s
   e.g. ((freq-in-seq [:a :a :b :c]) :a)  ==> 2"
  [s]
  (query-seq (n/quasi-frequency-by identity) s))

((freq-in-seq my-seq) :a) ==> 3

quasi-distinct-by:

(query-seq (n/quasi-distinct-by identity) my-seq)
==> [:a :b :c]

Там также анализ потока в реальном времени с query-stream,

Вот кое-что, показывающее, как вы можете сэмплировать поток, чтобы получить количество изменений за считанные значения 'period':

(s/stream->seq 
  (->> my-seq
       (map #(hash-map :timestamp %1 :value %2) (range))
       (query-stream (n/group-by identity n/rate) 
                     {:value :value :timestamp :timestamp :period 3})))
==> ({:timestamp 3, :value {:a 2, :b 1}} {:timestamp 6, :value {:b 3}} {:timestamp 9, :value {:a 1, :b 1, :c 1}} {:timestamp 12, :value {:c 1}})

Результатом является последовательность изменений каждые 3 элемента (период 3) с соответствующей отметкой времени.

Вы также можете написать собственные агрегаторы потоков, которые, вероятно, будут такими, как вы собираете значения в потоке выше. Я быстро с этим справился и ужасно не смог заставить его работать (только на моем обеденном перерыве в данный момент), но это работает на своем месте:

(defn lazy-value-accum
  ([s] (lazy-value-accum s {}))
  ([s m]
   (when-not (empty? s)
     (lazy-seq
      (let [new-map (merge-with + m (:value (first s)))]
        (cons new-map
              (lazy-value-accum (rest s) new-map))))))


(lazy-value-accum
  (s/stream->seq 
    (->> my-seq
         (map #(hash-map :timestamp %1 :value %2) (range))
         (query-stream (n/group-by identity n/rate) 
                       {:value :value :timestamp :timestamp :period 3}))))
==> ({:a 2, :b 1} {:a 2, :b 4} {:a 3, :b 5, :c 1} {:a 3, :b 5, :c 2})

который показывает постепенно накапливающееся количество каждого значения после каждого period образцы, которые можно читать лениво.

Как насчет использования partition-all для создания ленивого списка разделов максимального размера n, применения частот к каждому разделу, их слияния и последующей фильтрации окончательной карты?

(defn lazy-count-and-filter
  [coll n threshold]
  (filter #(< threshold (val %))
          (apply (partial merge-with +) 
                 (map frequencies 
                      (partition-all n coll)))))

например:

(lazy-count-and-filter [:a :c :b :c :a :d :a] 2 1)
==> ([:a 3] [:c 2])

Если вы хотите ускорить работу на одном узле, рассмотрите редукторы или core.async, как показано в этом посте.

Если это очень большой набор данных, и эта операция требуется часто, и у вас есть ресурсы для создания многоузлового кластера, вы можете рассмотреть возможность настройки Storm или Onyx.

На самом деле, похоже, что редукторы дадут вам наибольшую пользу при наименьшем объеме работы. Со всеми перечисленными мною опциями более мощные, гибкие и быстрые решения требуют больше времени для понимания. В порядке от самых простых до самых мощных это редукторы, core.async, Storm, Onyx.

Другие вопросы по тегам