Поток данных / apache beam Триггерное окно по количеству байтов в окне

У меня есть простая работа, которая перемещает данные из sub sub в gcs. Подтема pub - это общая тема с различными типами сообщений различного размера.

Я хочу, чтобы результат был в вертикальном разделе GCS соответственно:

Схема / версия / год / месяц / день /

под этим родительским ключом должна быть группа файлов для этого дня, и файлы должны быть разумного размера, то есть 10-200 МБ

Я использую scio, и я в состоянии групповой операции, чтобы сделать P/SCollection [String, Iterable[Event]], где ключ основан на схеме разделения выше.

Я не могу использовать текстовый приемник по умолчанию, так как они не поддерживают вертикальное разбиение, он может записать только всю коллекцию в одно место. Вместо этого следуйте совету в следующих ответах:

Как мне написать несколько файлов в Apache Beam?

Запись в Google Cloud Storage из PubSub с использованием облачного потока данных с использованием DoFn

Я создал простую функцию, которая записывает мою группу в gcs.

object GcsWriter {

  private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService

  val EXTENSION = ".jsonl.gz"

  //todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
  //maybe beam is aimed at a different use case
  //this is an output 'transform' that writes text files
  //org.apache.beam.sdk.io.TextIO.write().to("output")


  def gzip(bytes: Array[Byte]): Array[Byte] = {
    val byteOutputStream = new ByteArrayOutputStream()
    val compressedStream = new GZIPOutputStream(byteOutputStream)
    compressedStream.write(bytes)
    compressedStream.close()
    byteOutputStream.toByteArray
  }

  def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
    val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
    val compressed = gzip(bytes)
    val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
    gcs.create(blobInfo, compressed)
  }

}

Это работает и пишет файлы, как мне нравится, я использую следующие правила запуска с фиксированными окнами:

val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
  val WINDOW_ELEMENT_MAX_COUNT = 5000
  val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
  val ALLOWED_LATENESS: Duration = Duration.standardHours(1)


  val WINDOW_OPTIONS = WindowOptions(
    trigger = AfterFirst.of(
      ListBuffer(
        AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
        AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
    allowedLateness = ALLOWED_LATENESS,
    accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
  )

В основном составной триггер в конце окна в соответствии с водяным знаком или при получении элементов x.

Проблема заключается в том, что исходные данные могут иметь сообщения различного размера. Поэтому, если я выберу фиксированное количество элементов для запуска, я либо:

1) выберите слишком большое число, для больших групп событий это вызовет кучу java на рабочем месте 2) выберите меньшее число, затем я получу несколько крошечных файлов для тихих событий, где я хотел бы накопить больше событий в моем файле.

Я не вижу пользовательского триггера, где я могу передать лямбду, которая выводит метрику для каждого элемента или что-то в этом роде. Есть ли способ, которым я могу реализовать свой собственный триггер для запуска на количество байтов в окне.

Некоторые другие вопросы

Правильно ли я полагаю, что итератор для элементов в каждой группе находится в памяти, а не из памяти? Если бы не я, то мог бы поток от итератора к gcs более эффективным способом памяти

Для моего автора GCS я просто делаю это на карте или в ParDo. Он не реализует приемник вывода файла или выглядит как TextIo. Будут ли проблемы с этой простой реализацией. в документации говорится, что если преобразование выдает исключение, оно просто повторяется (неопределенно для потоковых приложений)

0 ответов

Другие вопросы по тегам