Поток данных / apache beam Триггерное окно по количеству байтов в окне
У меня есть простая работа, которая перемещает данные из sub sub в gcs. Подтема pub - это общая тема с различными типами сообщений различного размера.
Я хочу, чтобы результат был в вертикальном разделе GCS соответственно:
Схема / версия / год / месяц / день /
под этим родительским ключом должна быть группа файлов для этого дня, и файлы должны быть разумного размера, то есть 10-200 МБ
Я использую scio, и я в состоянии групповой операции, чтобы сделать P/SCollection [String, Iterable[Event]], где ключ основан на схеме разделения выше.
Я не могу использовать текстовый приемник по умолчанию, так как они не поддерживают вертикальное разбиение, он может записать только всю коллекцию в одно место. Вместо этого следуйте совету в следующих ответах:
Как мне написать несколько файлов в Apache Beam?
Я создал простую функцию, которая записывает мою группу в gcs.
object GcsWriter {
private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService
val EXTENSION = ".jsonl.gz"
//todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
//maybe beam is aimed at a different use case
//this is an output 'transform' that writes text files
//org.apache.beam.sdk.io.TextIO.write().to("output")
def gzip(bytes: Array[Byte]): Array[Byte] = {
val byteOutputStream = new ByteArrayOutputStream()
val compressedStream = new GZIPOutputStream(byteOutputStream)
compressedStream.write(bytes)
compressedStream.close()
byteOutputStream.toByteArray
}
def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
val compressed = gzip(bytes)
val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
gcs.create(blobInfo, compressed)
}
}
Это работает и пишет файлы, как мне нравится, я использую следующие правила запуска с фиксированными окнами:
val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
val WINDOW_ELEMENT_MAX_COUNT = 5000
val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
val ALLOWED_LATENESS: Duration = Duration.standardHours(1)
val WINDOW_OPTIONS = WindowOptions(
trigger = AfterFirst.of(
ListBuffer(
AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
allowedLateness = ALLOWED_LATENESS,
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
)
В основном составной триггер в конце окна в соответствии с водяным знаком или при получении элементов x.
Проблема заключается в том, что исходные данные могут иметь сообщения различного размера. Поэтому, если я выберу фиксированное количество элементов для запуска, я либо:
1) выберите слишком большое число, для больших групп событий это вызовет кучу java на рабочем месте 2) выберите меньшее число, затем я получу несколько крошечных файлов для тихих событий, где я хотел бы накопить больше событий в моем файле.
Я не вижу пользовательского триггера, где я могу передать лямбду, которая выводит метрику для каждого элемента или что-то в этом роде. Есть ли способ, которым я могу реализовать свой собственный триггер для запуска на количество байтов в окне.
Некоторые другие вопросы
Правильно ли я полагаю, что итератор для элементов в каждой группе находится в памяти, а не из памяти? Если бы не я, то мог бы поток от итератора к gcs более эффективным способом памяти
Для моего автора GCS я просто делаю это на карте или в ParDo. Он не реализует приемник вывода файла или выглядит как TextIo. Будут ли проблемы с этой простой реализацией. в документации говорится, что если преобразование выдает исключение, оно просто повторяется (неопределенно для потоковых приложений)