Можно ли создать "бесконечный" поток из таблицы базы данных, используя Akka Stream
Я играю с Akka Streams 2.4.2 и задаюсь вопросом, возможно ли настроить поток, который использует таблицу базы данных для источника, и когда в таблицу добавляется запись, эта запись материализуется и отправляется вниз по течению?
ОБНОВЛЕНИЕ: 23.02.16
Я реализовал решение от @PH88. Вот мое определение таблицы:
case class Record(id: Int, value: String)
class Records(tag: Tag) extends Table[Record](tag, "my_stream") {
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (Record.tupled, Record.unapply)
}
Вот реализация:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
try{
val newRecStream = Source.unfold((0, List[Record]())) { n =>
try {
val q = for (r <- TableQuery[Records].filter(row => row.id > n._1)) yield (r)
val r = Source.fromPublisher(db.stream(q.result)).collect {
case rec => println(s"${rec.id}, ${rec.value}"); rec
}.runFold((n._1, List[Record]())) {
case ((id, xs), current) => (current.id, current :: xs)
}
val answer: (Int, List[Record]) = Await.result(r, 5.seconds)
Option(answer, None)
}
catch { case e:Exception => println(e); Option(n, e) }
}
Await.ready(newRecStream.throttle(1, 1.second, 1, ThrottleMode.shaping).runForeach(_ => ()), Duration.Inf)
}
finally {
system.shutdown
db.close
}
Но моя проблема в том, что когда я пытаюсь позвонить flatMapConcat
тип, который я получаю, Serializable
,
ОБНОВЛЕНИЕ: 24.02.16
Обновлено, чтобы попробовать db.run
предложение от @PH88:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val disableAutoCommit = SimpleDBIO(_.connection.setAutoCommit(false))
val queryLimit = 1
try {
val newRecStream = Source.unfoldAsync(0) { n =>
val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit)
db.run(q.result).map { recs =>
Some(recs.last.id, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(s"${rec.id}, ${rec.value}")
}
Await.ready(newRecStream, Duration.Inf)
}
catch
{
case ex: Throwable => println(ex)
}
finally {
system.shutdown
db.close
}
Что работает (я изменил лимит запросов на 1, так как в моей таблице базы данных в настоящее время есть только пара элементов) - за исключением того, что он печатает последнюю строку в таблице, в которой существует программа. Вот мой вывод журнала:
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/xxxxxxx/dev/src/scratch/scala/fpp-in-scala/target/scala-2.11/classes/logback.xml]
17:09:28,062 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:09:28,064 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:09:28,079 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
17:09:28,102 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [application] to DEBUG
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
17:09:28,103 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:09:28,104 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@4278284b - Registering current configuration as safe fallback point
17:09:28.117 [main] INFO com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
1, WASSSAAAAAAAP!
2, WHAAAAT?!?
3, booyah!
4, what!
5, This rocks!
6, Again!
7, Again!2
8, I love this!
9, Akka Streams rock
10, Tuning jdbc
17:09:39.000 [main] INFO com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.
Process finished with exit code 0
Найден недостающий кусок - нужно заменить это:
Some(recs.last.id, recs)
с этим:
val lastId = if(recs.isEmpty) n else recs.last.id
Some(lastId, recs)
Звонил recs.last.id java.lang.UnsupportedOperationException: empty.last
когда набор результатов был пуст.
3 ответа
В общем случае база данных SQL является "пассивной" конструкцией и не активно проталкивает изменения, подобные описанным вами. Вы можете только "смоделировать" "толчок" с периодическим опросом, например:
val newRecStream = Source
// Query for table changes
.unfold(initState) { lastState =>
// query for new data since lastState and save the current state into newState...
Some((newState, newRecords))
}
// Throttle to limit the poll frequency
.throttle(...)
// breaks down into individual records...
.flatMapConcat { newRecords =>
Source.unfold(newRecords) { pendingRecords =>
if (records is empty) {
None
} else {
// take one record from pendingRecords and save to newRec. Save the rest into remainingRecords.
Some(remainingRecords, newRec)
}
}
}
Обновлено: 24.02.2016
Пример псевдокода, основанный на обновлении вопроса от 23.02.2016:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val queryLimit = 10
try {
val completion = Source
.unfoldAsync(0) { lastRowId =>
val q = TableQuery[Records].filter(row => row.id > lastRowId).take(queryLimit)
db.run(q.result).map { recs =>
Some(recs.last.id, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(s"${rec.id}, ${rec.value}")
}
// Block forever
Await.ready(completion, Duration.Inf)
} catch {
case ex: Throwable => println(ex)
} finally {
system.shutdown
db.close
}
Он будет многократно выполнять запрос в unfoldAsync
против БД, получая не более 10 (queryLimit
) записывает время и отправляет записи вниз по течению (-> throttle
-> flatMapConcat
-> runForeach
). Await
в конце фактически заблокирует навсегда.
Обновлено: 25.02.2016
Исполняемый код для проверки концепции:
import akka.actor.ActorSystem
import akka.stream.{ThrottleMode, ActorMaterializer}
import akka.stream.scaladsl.Source
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object Infinite extends App{
implicit val system = ActorSystem("Publisher")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
case class Record(id: Int, value: String)
try {
val completion = Source
.unfoldAsync(0) { lastRowId =>
Future {
val recs = (lastRowId to lastRowId + 10).map(i => Record(i, s"rec#$i"))
Some(recs.last.id, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(rec)
}
Await.ready(completion, Duration.Inf)
} catch {
case ex: Throwable => println(ex)
} finally {
system.shutdown
}
}
Вот рабочий код бесконечной потоковой передачи базы данных. Это было протестировано с миллионами записей, вставленными в базу данных postgresql во время работы потокового приложения -
package infinite.streams.db
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.slick.scaladsl.SlickSession
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.slf4j.LoggerFactory
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor}
case class Record(id: Int, value: String) {
val content = s"<ROW><ID>$id</ID><VALUE>$value</VALUE></ROW>"
}
object InfiniteStreamingApp extends App {
println("Starting app...")
implicit val system: ActorSystem = ActorSystem("Publisher")
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
println("Initializing database configuration...")
val databaseConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig[JdbcProfile]("postgres3")
implicit val session: SlickSession = SlickSession.forConfig(databaseConfig)
import databaseConfig.profile.api._
class Records(tag: Tag) extends Table[Record](tag, "test2") {
def id = column[Int]("c1")
def value = column[String]("c2")
def * = (id, value) <> (Record.tupled, Record.unapply)
}
val db = databaseConfig.db
println("Prime for streaming...")
val logic: Flow[(Int, String), (Int, String), NotUsed] = Flow[(Int, String)].map {
case (id, value) => (id, value.toUpperCase)
}
val fetchSize = 5
try {
val done = Source
.unfoldAsync(0) {
lastId =>
println(s"Fetching next: $fetchSize records with id > $lastId")
val query = TableQuery[Records].filter(_.id > lastId).take(fetchSize)
db.run(query.result.withPinnedSession)
.map {
recs => Some(recs.last.id, recs)
}
}
.throttle(5, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat {
recs => Source.fromIterator(() => recs.iterator)
}
.map(x => (x.id, x.content))
.via(logic)
.log("*******Post Transformation******")
// .runWith(Sink.foreach(r => println("SINK: " + r._2)))
// Use runForeach or runWith(Sink)
.runForeach(rec => println("REC: " + rec))
println("Waiting for result....")
Await.ready(done, Duration.Inf)
} catch {
case ex: Throwable => println(ex.getMessage)
} finally {
println("Streaming end successfully")
db.close()
system.terminate()
}
}
application.conf
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
}
# Load using SlickSession.forConfig("slick-postgres")
postgres3 {
profile = "slick.jdbc.PostgresProfile$"
db {
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost/testdb"
user = "postgres"
password = "postgres"
}
numThreads = 2
}
}
Одна из возможностей заключается в том, что вы можете настроить развертывание источника для настройки бесконечного потока.
https://doc.akka.io/docs/akka/current/stream/operators/Source/unfold.html
с этим вы можете настроить источник следующим образом: val BeginOffset = 0L val fetchSize = 10000
(Source.unfold(initialOffset) { current =>
fetchOnlyMethod(offset) match {
case values if values.isEmpty => Some(offset, values)
case values => Some((offset++fetchSize), values)
}
})
затем используйте метод fetchOnly, чтобы получить фрагмент данных от смещения до предела выборки.
если вы хотите, чтобы источник останавливался, когда он достигает конца таблицы, вы можете заменить предложение «values.isEmpty» на:
case values if values.isEmpty => None
логика развертывания будет продолжать работать до тех пор, пока не достигнет значения None. С точки зрения используемого кортежа, первое значение — это то, что повторяется в логике свертки, а второе значение — это то, что передается в потоке с каждым циклом.