Установление синглтон-соединения с Google Cloud Bigtable в Scala, аналогичного Cassandra
Я пытаюсь внедрить систему рекомендаций в реальном времени, используя облачные сервисы Google. Я уже собрал движок, используя Kafka, Apache Storm и Cassandra, но я хочу создать тот же движок в Scala, используя Cloud Pub/Sub, Cloud Dataflow и Cloud Bigtable.
На данный момент в Cassandra, поскольку я многократно читал и писал во время операции с болтом Apache Storm, я реализовал следующий соединитель MyDatabase.scala, который инициирует одноэлементное соединение с базой данных, и использую это соединение внутри болта для чтения и обновления пользователя. таблица с использованием потоковых данных, которые поступают из носика Кафки. Я использовал драйвер Phantom Scala API для Cassandra.
MyDatabase.scala
import scala.concurrent.Await
import scala.concurrent.duration._
import com.websudos.phantom.dsl._
object CustomConnector {
val hosts = Seq("localhost")
val connector = ContactPoints(hosts).keySpace(""my_keyspace")
}
class MyDatabase(val keyspace: KeySpaceDef) extends Database(keyspace) {
object users extends Users with keyspace.Connector
}
object MyDatabase extends MyDatabase(CustomConnector.connector) {
Await.result(MyDatabase.autocreate.future(), 5.seconds)
}
Users.scala
import com.websudos.phantom.CassandraTable
import com.websudos.phantom.dsl._
import scala.concurrent.Future
case class User(
id: String,
items: Map[String, Int]
)
class UsersTable extends CassandraTable[Users, User] {
object id extends StringColumn(this) with PartitionKey[String]
object items extends MapColumn[String, Int](this)
def fromRow(row: Row): User = {
User(
id(row),
items(row)
)
}
}
abstract class Users extends UsersTable with RootConnector {
def store(user: User): Future[ResultSet] = {
insert.value(_.id, user.id).value(_.items, user.items)
.consistencyLevel_=(ConsistencyLevel.ALL)
.future()
}
def getById(id: String): Future[Option[User]] = {
select.where(_.id eqs id).one()
}
}
Конвейер потока данных будет выглядеть так:
- Входящие потоковые данные из Pub/Sub.
- Реализуйте логику в одном parDo, где мы обновим несколько таблиц в Bigtable некоторыми новыми значениями, которые сгенерированы из загруженных данных из Pub/Sub.
Создание связи с Кассандрой довольно прямолинейно, когда вы работаете с Phantom DSL. У меня вопрос: есть ли какая-либо библиотека Equivelant, например Phantom для Google Cloud Bigtable, или какой правильный способ реализовать это с помощью Google Cloud API и Scio (поскольку я буду реализовывать конвейер потока данных с помощью Scala). Кажется, я не могу найти нигде подходящего примера для установления соединения с Bigtable и использования этого соединения внутри конвейера потока данных в Scala.
Спасибо
1 ответ
Способ Beam для совместного использования соединения с базой данных между обработкой нескольких элементов в DoFn
это использовать @Setup
а также @Teardown
методы. Посмотрите исходный код коннектора Beam Cassandra для примера.