Ошибка записи потока PubSub в облачное хранилище с использованием потока данных
Использование SCIO из spotify
написать работу для Dataflow
следующие 2 примера e.g1 и e.g2, чтобы написать PubSub
поток в GCS
, но получите следующую ошибку для приведенного ниже кода
ошибка
Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection
Код
object StreamingPubSub {
def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()
val sc = ScioContext(opts)
sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
_ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
}
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))
val result = sc.close()
// CTRL-C to cancel the streaming pipeline
dataflowUtils.waitToFinish(result.internal)
}
}
Возможно, я смешиваю концепцию окна с Bounded PCollection, есть ли способ добиться этого или мне нужно применить какое-то преобразование, чтобы это произошло, любой может помочь в этом
1 ответ
Я верю SCIO saveAsTextFile
внизу использует Dataflow's Write
преобразование, которое поддерживает только ограниченные PCollections. Dataflow пока не предоставляет прямого API для записи неограниченной коллекции PC в Google Cloud Storage, хотя мы исследуем это.
Чтобы сохранить неограниченную коллекцию PC где-нибудь, рассмотрим, например, BigQuery, Datastore или Bigtable. В API SCIO вы можете использовать, например, saveAsBigQuery
,