Установление синглтон-соединения с Google Cloud Bigtable в Scala, аналогичного Cassandra

Я пытаюсь внедрить систему рекомендаций в реальном времени, используя облачные сервисы Google. Я уже собрал движок, используя Kafka, Apache Storm и Cassandra, но я хочу создать тот же движок в Scala, используя Cloud Pub/Sub, Cloud Dataflow и Cloud Bigtable.

На данный момент в Cassandra, поскольку я многократно читал и писал во время операции с болтом Apache Storm, я реализовал следующий соединитель MyDatabase.scala, который инициирует одноэлементное соединение с базой данных, и использую это соединение внутри болта для чтения и обновления пользователя. таблица с использованием потоковых данных, которые поступают из носика Кафки. Я использовал драйвер Phantom Scala API для Cassandra.

MyDatabase.scala

import scala.concurrent.Await
import scala.concurrent.duration._
import com.websudos.phantom.dsl._


object CustomConnector {

  val hosts = Seq("localhost")

  val connector = ContactPoints(hosts).keySpace(""my_keyspace")

}

class MyDatabase(val keyspace: KeySpaceDef) extends Database(keyspace) {
  object users extends Users with keyspace.Connector
}

object MyDatabase extends MyDatabase(CustomConnector.connector) {
  Await.result(MyDatabase.autocreate.future(), 5.seconds)
}

Users.scala

import com.websudos.phantom.CassandraTable
import com.websudos.phantom.dsl._

import scala.concurrent.Future

case class User(
                 id: String,
                 items: Map[String, Int]
               )

class UsersTable extends CassandraTable[Users, User] {

  object id extends StringColumn(this) with PartitionKey[String]
  object items extends MapColumn[String, Int](this)

  def fromRow(row: Row): User = {
    User(
      id(row),
      items(row)
    )
  }
}

abstract class Users extends UsersTable with RootConnector {

  def store(user: User): Future[ResultSet] = {
    insert.value(_.id, user.id).value(_.items, user.items)
      .consistencyLevel_=(ConsistencyLevel.ALL)
      .future()
  }

  def getById(id: String): Future[Option[User]] = {
    select.where(_.id eqs id).one()
  }
}

Конвейер потока данных будет выглядеть так:

  1. Входящие потоковые данные из Pub/Sub.
  2. Реализуйте логику в одном parDo, где мы обновим несколько таблиц в Bigtable некоторыми новыми значениями, которые сгенерированы из загруженных данных из Pub/Sub.

Создание связи с Кассандрой довольно прямолинейно, когда вы работаете с Phantom DSL. У меня вопрос: есть ли какая-либо библиотека Equivelant, например Phantom для Google Cloud Bigtable, или какой правильный способ реализовать это с помощью Google Cloud API и Scio (поскольку я буду реализовывать конвейер потока данных с помощью Scala). Кажется, я не могу найти нигде подходящего примера для установления соединения с Bigtable и использования этого соединения внутри конвейера потока данных в Scala.

Спасибо

1 ответ

Способ Beam для совместного использования соединения с базой данных между обработкой нескольких элементов в DoFn это использовать @Setup а также @Teardown методы. Посмотрите исходный код коннектора Beam Cassandra для примера.

Другие вопросы по тегам