Как реактивные потоки используются в Slick для вставки данных
В документации Slick приведены примеры использования Reactive Streams только для чтения данных как средства DatabasePublisher. Но что произойдет, если вы захотите использовать свою базу данных как Sink и обратно в зависимости от скорости вставки?
Я искал эквивалентный DatabaseSubscriber, но он не существует. Итак, вопрос в том, есть ли у меня Источник, скажите:
val source = Source(0 to 100)
Как я могу создать Sink с Slick, который записывает эти значения в таблицу со схемой:
create table NumberTable (value INT)
2 ответа
Серийные Вставки
Самый простой способ - сделать вставки внутри Sink.foreach
,
Предполагая, что вы использовали генерацию кода схемы и далее предполагаете, что ваша таблица называется "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
Мы можем написать функцию, которая делает вставку
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
И эта функция может быть помещена в раковину
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Пакетные вставки
Вы можете дополнительно расширить методологию Sink, выполняя пакетирование по N вставок за раз:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
Этот порционный Раковина может питаться Flow
которая выполняет групповую группировку:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
Хотя вы можете использовать Sink.foreach
чтобы достичь этого (как упомянул Рамон), безопаснее и, вероятно, быстрее (при параллельном запуске вставок) использовать mapAsync
Flow
, Проблема, с которой вы столкнетесь при использовании Sink.foreach
является то, что он не имеет возвращаемого значения. Вставка в базу данных с помощью сликов db.run
метод возвращает Future
который затем выйдет из пар вернул Future[Done]
которая завершается, как только Sink.foreach
отделки.
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
С другой стороны def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
позволяет вам запускать вставки параллельно через параметр параллелизма и принимает функцию от значения outstream out для будущего некоторого типа. Это соответствует нашему i => db.run(numbers += i)
функция. Самое замечательное в этом Flow
является то, что он затем кормит результат этих Futures
вниз по течению.
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
Чтобы доказать свою точку зрения, вы даже можете вернуть реальный результат из потока, а не Future[Done]
(С Готовым, представляющим Единицу). Этот поток также добавит более высокое значение параллелизма и пакетную обработку для дополнительной производительности. *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- Примечание: вы, вероятно, не увидите лучшей производительности для такого небольшого набора данных, но когда я имел дело со вставкой 1,7 м, я смог добиться наилучшей производительности на моей машине с размером пакета 1000 и значением параллелизма 8, локально с postgresql. Это было примерно в два раза лучше, чем не работать параллельно. Как всегда, когда речь идет о производительности, ваши результаты могут отличаться, и вы должны измерить для себя.
Я считаю, что документация Alpakka превосходна, а DSL действительно упрощает работу с реактивными потоками.
Это документы для Slick: https://doc.akka.io/docs/alpakka/current/slick.html
Пример вставки:
Source(0 to 100)
.runWith(
// add an optional first argument to specify the parallelism factor (Int)
Slick.sink(value => sqlu"INSERT INTO NumberTable VALUES(${value})")
)