Чтение нескольких файлов.gz и определение того, какая строка принадлежит какому файлу
Я читаю несколько файлов.gz для обработки с использованием потока данных Google. Конечный пункт назначения данных - BigQuery. Таблица BigQuery имеет выделенные столбцы для каждого столбца в CSV-файле в файле.gz. В таблице BQ есть еще один столбец с именем file_name, в котором указано имя файла, к которому принадлежит эта запись. Я читаю файлы с использованием TextIO.Read и выполняю преобразование ParDo на нем. В DoFn есть способ идентифицировать имя файла, которому принадлежит входящая строка.
Мой код выглядит следующим образом:
PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
.from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));
PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}
Обновление 1:
Я сейчас пытаюсь, как показано ниже:
PCollection<String> fileNamesCollection // this is collection of file names
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
}
}));
Но когда я запускаю эту программу, получаю, что channelFactory не сериализуем, там есть какая-либо фабрика каналов, которая реализует интерфейс Serializable и может использоваться здесь.
Обновление 2: я наконец смог выполнить программу и успешно отправить задание. Спасибо JKFF за помощь. Ниже мой окончательный код, я вставляю его сюда, чтобы он помог другим.
ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
loggingOptions.setDefaultWorkerLogLevel(Level.WARN);
String jobName = "unique_job_name";
options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);
Pipeline pipeline = Pipeline.create(options);
List<String> filesToProcess = new ArrayList<String>();
for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
}
// at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));
PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
// I have to create _options here because Options, GcsIOChannelFactory are non serializable
ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
ReadableByteChannel readChannel = channelFactory.open(c.element());
GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
BufferedReader br = new BufferedReader(new InputStreamReader(gzip));
String line = null;
while ((line = br.readLine()) != null) {
c.output(KV.of(c.element(), line));
}
br.close();
gzip.close();
readChannel.close();
}
}));
// Performing reshuffling here as suggested
PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());
PCollection<TableRow> formattedResults = withFileName
.apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String,String> kv = c.element();
String logLine = kv.getValue();
String logFileName = kv.getKey();
// do further processing as you want here
}));
// Finally insert in BQ table the formattedResults
1 ответ
Прямо сейчас ответ - нет. Если вам нужен доступ к именам файлов, к сожалению, лучше всего в этом случае внедрить расширение filepattern и синтаксический анализ файла (как ParDo
). Вот несколько вещей, которые вы должны иметь в виду:
- Обязательно вставьте перераспределение прямо перед разбором
ParDo
, чтобы предотвратить чрезмерное слияние. - Ты можешь использовать
GcsIoChannelFactory
развернуть файл шаблона (см. примеры в этом вопросе) и открытьReadableByteChannel
, Используйте Channels.newInputStream для созданияInputStream
, а затем обернуть его в стандарт JavaGZipInputStream
и прочитайте это построчно - посмотрите этот вопрос для примеров. Не забудьте закрыть потоки.
В качестве альтернативы вы можете написать свой собственный файловый источник. Тем не менее, в этом конкретном случае (файлы.gz) я бы рекомендовал против этого, потому что этот API в первую очередь предназначен для файлов, которые могут быть прочитаны с произвольным доступом с любого смещения.