Как адаптировать IReduceInit из next.jdbc для потоковой передачи JSON с использованием Cheshire в ответ HTTP с использованием кольца
Tl ;dr как превратить IReduceInit в lazy-seq преобразованных значений
У меня есть запрос к базе данных, который дает достаточно большой набор данных для динамического поворота на клиенте (миллион или две строки, 25 атрибутов - не проблема для современного ноутбука).
Мой (упрощенный) стек должен был вызвать clojure.jdbc, чтобы получить (что я считал ленивым) последовательность строк результатов. Я мог бы просто сериализовать это, передав его как тело через промежуточное программное обеспечение ring-json. Возникла проблема с созданием кольцом-json строки ответа в куче, но в версии 0.5.0 есть возможность передавать ответ в потоковом режиме.
При профилировании нескольких случаев сбоя я обнаружил, что на самом деле clojure.jdbc реализует весь набор результатов в памяти, прежде чем передать его обратно. Нет проблем! Вместо того, чтобы работать сreducible-query
в этой библиотеке я решил перейти на новый next.jdbc.
Ключевой операцией в next.jdbc является plan
который возвращает IReduceInit, который я могу использовать для выполнения запроса и получения набора результатов...
(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]
Однако это реализует весь набор результатов и в приведенном выше случае предоставит мне все идентификаторы заранее и в памяти. Не проблема для одного, но у меня обычно много.
План IReduceInit - это то, что я могу уменьшить, если укажу начальное значение, чтобы я мог сделать вывод в функции сокращения... (thx @amalloy)
(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil
... но в идеале я хотел бы превратить этот IReduceInit в ленивую последовательность значений после применения к ним функции преобразования, чтобы я мог использовать их с ring-json и cheshire. Я не вижу очевидного способа сделать это.
4 ответа
Есть довольно много причин, по которым мой lazy-seq был плохой идеей - даже если я гарантирую, что не буду держать голову, исключительные проблемы во время потоковой передачи результатов, несомненно, оставят ResultSet незаметным - сериализация будет происходить вне стека вызовов, который может убрать.
Потребность в лени вызвана желанием не реализовывать весь результат в памяти, потребностью в seq или другом столбце? так, чтобы промежуточное ПО сериализовало его...
Поэтому сделайте IReduceInit JSONable напрямую, а затем обойдите промежуточное ПО. Если во время сериализации возникнет исключение, элемент управления будет проходить через IReduceInit из next.jdbc, который затем может быть очищен значимо.
;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
ring-protocols/StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
(json/generate-stream body (io/writer output-stream) options)))
;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
cheshire.generate/JSONable
(to-json [^IReduceInit results ^JsonGenerator jg]
(.writeStartArray jg)
(let [rf (fn [_ ^IPersistentMap m]
(cheshire.generate/encode-map m jg))]
(reduce rf nil results))
(.writeEndArray jg)))
;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream
По-прежнему кажется, что над компоновкой этих функций много работы, код адаптера всегда заставляет меня беспокоиться, что мне не хватает простого идиоматического подхода.
reduce
отлично работает с IReduceInit. IReduceInit требует начального значения, которое вы указали при вызове.reduce, но не при использовании функции уменьшения; это объясняет, почему вы видели, что один работает, а другой нет.
Однако это не приведет к ленивой последовательности. Частьreduce
контракт заключается в том, что он охотно потребляет весь ввод (мы проигнорируем reduced
что ничего значимого не меняет). Ваш вопрос является частным случаем более общей проблемы динамической области видимости: последовательность, созданная JDBC, является "действительной" только в некотором контексте, и вам нужно выполнять всю свою обработку в этом контексте, поэтому она не может быть ленивой. Вместо этого вы обычно выворачиваете свою программу наизнанку: не используйте возвращаемое значение как последовательность, а передайте механизму запросов функцию и говорите: "Пожалуйста, вызовите эту функцию со своими результатами". Затем движок гарантирует, что данные действительны, пока он вызывает эту функцию, и как только функция возвращает, он очищает данные. Я не знаю насчет jdbc.next, но со старым jdbc вы бы использовали что-то вродеdb-query-with-resultset
для этого. Вы бы передали ему некоторую функцию, которая могла бы добавлять байты в ожидающий HTTP-ответ, и он вызывал бы эту функцию много раз.
Все это немного расплывчато, потому что я не знаю, какой HTTP-обработчик вы используете, или каковы его возможности для неленивой обработки потоковых ответов, но это общая идея, с которой вам придется идти, если вы хотите обрабатывать ресурс с динамической областью видимости: лень просто не вариант.
IReduceInit позволяет запускать ресурсы JDBC при выходе из функции сокращения. Это гораздо более предсказуемо, чем подход LazySeq, который НИКОГДА не освобождает ресурсы JDBC.
Вы используете BlockingQueue и будущую задачу для заполнения этой очереди следующим образом
(defn lazywalk-reducible
"walks the reducible in chunks of size n,
returns an iterable that permits access"
[n reducible]
(reify java.lang.Iterable
(iterator [_]
(let [bq (java.util.concurrent.ArrayBlockingQueue. n)
finished? (volatile! false)
traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
(vreset! finished? true))]
(reify java.util.Iterator
(hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
(next [_] (.take bq)))))))
Это, конечно, вызовет утечку, если итератор будет создан, но не будет выполнен до его завершения.
Тщательно не тестировал, могут быть и другие проблемы; но такой подход должен работать.
В качестве альтернативы вы могли бы сделать это clojure.lang.ISeq
если Java Iterable не подходит для вашего случая использования; но затем вы начинаете задаваться вопросами HeadRetention; и как обработать звонокObject first()
что было бы вполне выполнимо, но я не хотел обдумывать это
Разочарование.
Почему вы не можете сделать это с помощью JDBC? без каких-либо слоев Clojure?
(let [resultset (.executeQuery connection "select ...")]
(loop
(when (.next resultset)
(let [row [(.getString resultset 1)
(.getString resultset 2)
...]])
(json/send row)
(recur)))
(json/end))
Конечно, с помощью ResultSetMetaData вы можете автоматизировать создание строки в функцию, которая может справиться со всем, что возвращается.