Как адаптировать IReduceInit из next.jdbc для потоковой передачи JSON с использованием Cheshire в ответ HTTP с использованием кольца

Tl ;dr как превратить IReduceInit в lazy-seq преобразованных значений

У меня есть запрос к базе данных, который дает достаточно большой набор данных для динамического поворота на клиенте (миллион или две строки, 25 атрибутов - не проблема для современного ноутбука).

Мой (упрощенный) стек должен был вызвать clojure.jdbc, чтобы получить (что я считал ленивым) последовательность строк результатов. Я мог бы просто сериализовать это, передав его как тело через промежуточное программное обеспечение ring-json. Возникла проблема с созданием кольцом-json строки ответа в куче, но в версии 0.5.0 есть возможность передавать ответ в потоковом режиме.

При профилировании нескольких случаев сбоя я обнаружил, что на самом деле clojure.jdbc реализует весь набор результатов в памяти, прежде чем передать его обратно. Нет проблем! Вместо того, чтобы работать сreducible-query в этой библиотеке я решил перейти на новый next.jdbc.

Ключевой операцией в next.jdbc является plan который возвращает IReduceInit, который я могу использовать для выполнения запроса и получения набора результатов...

(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]

Однако это реализует весь набор результатов и в приведенном выше случае предоставит мне все идентификаторы заранее и в памяти. Не проблема для одного, но у меня обычно много.

План IReduceInit - это то, что я могу уменьшить, если укажу начальное значение, чтобы я мог сделать вывод в функции сокращения... (thx @amalloy)

(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil

... но в идеале я хотел бы превратить этот IReduceInit в ленивую последовательность значений после применения к ним функции преобразования, чтобы я мог использовать их с ring-json и cheshire. Я не вижу очевидного способа сделать это.

4 ответа

Решение

Есть довольно много причин, по которым мой lazy-seq был плохой идеей - даже если я гарантирую, что не буду держать голову, исключительные проблемы во время потоковой передачи результатов, несомненно, оставят ResultSet незаметным - сериализация будет происходить вне стека вызовов, который может убрать.

Потребность в лени вызвана желанием не реализовывать весь результат в памяти, потребностью в seq или другом столбце? так, чтобы промежуточное ПО сериализовало его...

Поэтому сделайте IReduceInit JSONable напрямую, а затем обойдите промежуточное ПО. Если во время сериализации возникнет исключение, элемент управления будет проходить через IReduceInit из next.jdbc, который затем может быть очищен значимо.

;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
  ring-protocols/StreamableResponseBody
  (write-body-to-stream [_ _ output-stream]
    (json/generate-stream body (io/writer output-stream) options)))
 
;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
  cheshire.generate/JSONable
  (to-json [^IReduceInit results ^JsonGenerator jg]
    (.writeStartArray jg)
    (let [rf (fn [_ ^IPersistentMap m]
               (cheshire.generate/encode-map m jg))]
      (reduce rf nil results))
    (.writeEndArray jg)))

;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream

По-прежнему кажется, что над компоновкой этих функций много работы, код адаптера всегда заставляет меня беспокоиться, что мне не хватает простого идиоматического подхода.

reduceотлично работает с IReduceInit. IReduceInit требует начального значения, которое вы указали при вызове.reduce, но не при использовании функции уменьшения; это объясняет, почему вы видели, что один работает, а другой нет.

Однако это не приведет к ленивой последовательности. Частьreduceконтракт заключается в том, что он охотно потребляет весь ввод (мы проигнорируем reducedчто ничего значимого не меняет). Ваш вопрос является частным случаем более общей проблемы динамической области видимости: последовательность, созданная JDBC, является "действительной" только в некотором контексте, и вам нужно выполнять всю свою обработку в этом контексте, поэтому она не может быть ленивой. Вместо этого вы обычно выворачиваете свою программу наизнанку: не используйте возвращаемое значение как последовательность, а передайте механизму запросов функцию и говорите: "Пожалуйста, вызовите эту функцию со своими результатами". Затем движок гарантирует, что данные действительны, пока он вызывает эту функцию, и как только функция возвращает, он очищает данные. Я не знаю насчет jdbc.next, но со старым jdbc вы бы использовали что-то вродеdb-query-with-resultsetдля этого. Вы бы передали ему некоторую функцию, которая могла бы добавлять байты в ожидающий HTTP-ответ, и он вызывал бы эту функцию много раз.

Все это немного расплывчато, потому что я не знаю, какой HTTP-обработчик вы используете, или каковы его возможности для неленивой обработки потоковых ответов, но это общая идея, с которой вам придется идти, если вы хотите обрабатывать ресурс с динамической областью видимости: лень просто не вариант.

IReduceInit позволяет запускать ресурсы JDBC при выходе из функции сокращения. Это гораздо более предсказуемо, чем подход LazySeq, который НИКОГДА не освобождает ресурсы JDBC.

Вы используете BlockingQueue и будущую задачу для заполнения этой очереди следующим образом

 (defn lazywalk-reducible
  "walks the reducible in chunks of size n,
  returns an iterable that permits access"
  [n reducible]
  (reify java.lang.Iterable
    (iterator [_]
      (let [bq (java.util.concurrent.ArrayBlockingQueue. n)
            finished? (volatile! false)
            traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
                              (vreset! finished? true))]
        (reify java.util.Iterator
          (hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
          (next [_] (.take bq)))))))

Это, конечно, вызовет утечку, если итератор будет создан, но не будет выполнен до его завершения.

Тщательно не тестировал, могут быть и другие проблемы; но такой подход должен работать.

В качестве альтернативы вы могли бы сделать это clojure.lang.ISeqесли Java Iterable не подходит для вашего случая использования; но затем вы начинаете задаваться вопросами HeadRetention; и как обработать звонокObject first() что было бы вполне выполнимо, но я не хотел обдумывать это

Разочарование.

Почему вы не можете сделать это с помощью JDBC? без каких-либо слоев Clojure?

(let [resultset (.executeQuery connection "select ...")]
  (loop 
   (when (.next resultset)
     (let [row [(.getString resultset 1)
                (.getString resultset 2)
                ...]])
     (json/send row)
     (recur)))
  (json/end))

Конечно, с помощью ResultSetMetaData вы можете автоматизировать создание строки в функцию, которая может справиться со всем, что возвращается.

Другие вопросы по тегам