Монадическая складка с Государственной монадой в постоянном пространстве (куча и стек)?
Можно ли выполнить складывание в монаде состояния в постоянном стеке и пространстве кучи? Или другая функциональная техника лучше подходит для моей проблемы?
В следующих разделах описывается проблема и мотивирующий сценарий использования. Я использую Scala, но также приветствуются решения на Haskell.
Сложите в State
Монада наполняет кучу
Предположим, Скалаз 7. Рассмотрим монадическую складку в государственной монаде. Чтобы избежать переполнения стека, мы будем батутировать сгиб.
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
Для большой коллекции col
, это заполнит кучу.
Я считаю, что во время сгиба для каждого значения в коллекции создается закрытие (государственный мобит) x: R
параметр), заполняя кучу. Ни один из них не может быть оценен до run 0
выполняется, обеспечивая исходное состояние.
Можно ли избежать такого использования O(n) кучи?
Более конкретно, может ли начальное состояние быть предоставлено перед сгибом, чтобы монада состояний могла выполняться во время каждого связывания, а не вкладывать замыкания для последующей оценки?
Или можно сложить складку так, чтобы она выполнялась лениво после того, как государственная монада run
? Таким образом, следующий x: R
Закрытие не будет создано до тех пор, пока предыдущие не будут оценены и сделаны пригодными для сбора мусора.
Или есть лучшая функциональная парадигма для такой работы?
Пример приложения
Но, возможно, я использую не тот инструмент для работы. Развитие примера использования приведено ниже. Я брожу по неправильному пути здесь?
Рассмотрим выборку из резервуара, т. Е. Выборку за один проход k
предметы из коллекции слишком велики, чтобы уместиться в памяти. В Scala такая функция может быть
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
и если прыщ в TraversableOnce
Тип может быть использован как это
val tenRandomInts = (Int.Min to Int.Max) sample 10
Работа сделана sample
по сути fold
:
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
Тем не мение, update
с состоянием; это зависит от n
, количество предметов уже видел. (Это также зависит от ГСЧ, но для простоты я предполагаю, что это глобально и с сохранением состояния. Методы, используемые для обработки n
будет распространяться тривиально.) Так как справиться с этим состоянием?
Нечистое решение простое и работает с постоянным стеком и кучей.
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
Но как насчет чисто функционального решения? update
должен взять n
в качестве дополнительного параметра и вернуть новое значение вместе с обновленным образцом. Мы могли бы включить n
в неявном состоянии накопитель складок, например,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
Но это затемняет намерение; мы действительно намерены накапливать вектор выборки. Эта проблема, кажется, готова для Государственной монады и монадической левой складки. Давай еще раз попробуем.
Мы будем использовать Scalaz 7, с этим импортом
import scalaz._
import Scalaz._
import scalaz.std.iterable_
и работать над Iterable[A]
, поскольку Скалаз не поддерживает монадическое сворачивание Traversable
,
sample
теперь определено
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
где обновление
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
К сожалению, это уносит стек в большую коллекцию.
Так что давайте батут это. sample
сейчас
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
где обновление
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
Это исправляет переполнение стека, но все равно уносит кучу для очень больших коллекций (или очень маленьких куч). Одна анонимная функция для каждого значения в коллекции создается во время сгиба (я думаю, что закрывать каждую x: A
параметр), потребляя кучу, прежде чем батут даже запустить. (FWIW, версия State тоже имеет эту проблему; переполнение стека только сначала появляется с меньшими коллекциями.)
2 ответа
Наша настоящая проблема - куча, используемая неисполненными государственными мобами.
Нет. Реальная проблема заключается в том, что коллекция не помещается в памяти и что foldLeftM
а также foldRightM
заставить всю коллекцию. Побочным эффектом нечистого решения является то, что вы освобождаете память по ходу дела. В "чисто функциональном" решении вы нигде этого не делаете.
Ваше использование Iterable
игнорирует важную деталь: какая коллекция col
на самом деле, как его элементы создаются и как они должны быть отброшены. И так, обязательно, делает foldLeftM
на Iterable
, Вероятно, он слишком строг, и вы помещаете всю коллекцию в память. Например, если это Stream
тогда, пока вы держитесь col
все элементы, вынужденные до сих пор, будут в памяти. Если это какой-то другой ленивый Iterable
это не запоминает его элементы, тогда сгиб все еще слишком строг.
Я попробовал ваш первый пример с EphemeralStream
не видел какого-либо значительного давления в куче, хотя он явно будет иметь те же "неисполненные государственные толпы". Разница в том, что EphemeralStream
на элементы слабо ссылаются и его foldRight
не заставляет весь поток.
Я подозреваю, что если вы использовали Foldable.foldr
тогда вы не увидите проблемного поведения, поскольку оно сворачивается с функцией, которая ленива во втором аргументе. Когда вы вызываете фолд, вы хотите, чтобы он немедленно возвращал подвеску, которая выглядит примерно так:
Suspend(() => head |+| tail.foldRightM(...))
Когда батут возобновит первую подвеску и подойдет к следующей подвеске, все распределения между подвесками станут доступными для освобождения сборщиком мусора.
Попробуйте следующее:
def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
if (bs.isEmpty) Monad[M].point(a)
else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))
val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._
foldM[M,R,Int](Monoid[R].zero, col) {
(x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run
Это будет работать в постоянной куче для батута монады M
, но переполняет стек для нетрамплиндовой монады.
Но настоящая проблема в том, что Iterable
не очень хорошая абстракция для данных, которые слишком велики, чтобы поместиться в памяти. Конечно, вы можете написать императивную программу с побочными эффектами, в которой вы явно отбрасываете элементы после каждой итерации или используете ленивое правое сгибание. Это работает хорошо, пока вы не захотите сочинить эту программу с другой. И я предполагаю, что вся причина, по которой вы расследуете это в State
Монада для начала состоит в том, чтобы получить композиционность.
Так что ты можешь сделать? Вот несколько вариантов:
- Использовать
Reducer
,Monoid
и его композицию, затем запускают в императивном явном освобождении петли (или ленивая правая складка на батуте) в качестве последнего шага, после которого композиция невозможна или не ожидается. - использование
Iteratee
состав и монадикаEnumerator
s, чтобы накормить их. - Напишите композиционные потоковые преобразователи со Scalaz-Stream.
Последний из этих вариантов - тот, который я бы использовал и рекомендовал в общем случае.
С помощью State
или любая подобная монада, не очень хороший подход к проблеме. С помощью State
приговорен к разносу стека / кучи на больших коллекциях. Рассмотрим значение x: State[A,B]
построенный из большой коллекции (например, складывая ее). затем x
можно оценить по разным значениям исходного состояния A
, дающий разные результаты. Так x
Необходимо сохранить всю информацию, содержащуюся в коллекции. В чистых настройках, x
не могу забыть некоторую информацию, чтобы не уничтожить стек / кучу, поэтому все, что вычисляется, остается в памяти до тех пор, пока не будет освобождено все монадическое значение, что происходит только после оценки результата. Так что потребление памяти x
пропорционально размеру коллекции.
Я считаю, что подходящим подходом к этой проблеме является использование функциональных итераций / каналов / каналов. Эта концепция (упоминаемая под этими тремя именами) была изобретена для обработки больших коллекций данных с постоянным потреблением памяти и для описания таких процессов с использованием простого комбинатора.
Я пытался использовать Скалаз Iteratees
, но кажется, что эта часть еще не созрела, она страдает от переполнения стека так же, как State
(или, возможно, я не правильно его использую; код доступен здесь, если кому-то интересно).
Тем не менее, это было просто, используя мою (все еще немного экспериментальную) библиотеку Scala-Conduit (отказ от ответственности: я автор):
import conduit._
import conduit.Pipe._
object Run extends App {
// Define a sampling function as a sink: It consumes
// data of type `A` and produces a vector of samples.
def sampleI[A](k: Int): Sink[A, Vector[A]] =
sampleI[A](k, 0, Vector())
// Create a sampling sink with a given state. It requests
// a value from the upstream conduit. If there is one,
// update the state and continue (the first argument to `requestF`).
// If not, return the current sample (the second argument).
// The `Finalizer` part isn't important for our problem.
private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
Sink[A, Vector[A]] =
requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
(_: Any) => sample)(Finalizer.empty)
// The sampling algorithm copied from the question.
val rand = new scala.util.Random()
def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
// Construct an iterable of all `short` values, pipe it into our sampling
// funcition, and run the combined pipe.
{
print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
sampleI(10)))
}
}
Обновление: можно было бы решить проблему, используя State
, но нам нужно реализовать пользовательский фолд специально для State
который знает, как сделать это постоянным пространством:
import scala.collection._
import scala.language.higherKinds
import scalaz._
import Scalaz._
import scalaz.std.iterable._
object Run extends App {
// Folds in a state monad over a foldable
def stateFold[F[_],E,S,A](xs: F[E],
f: (A, E) => State[S,A],
z: A)(implicit F: Foldable[F]): State[S,A] =
State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))
// Sample a lazy collection view
def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
State[Int,Vector[A]] =
stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())
// update using State monad
def update[A](k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...
{
print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
}
}