GCP Dataflow- чтение CSV-файла из хранилища и запись в BigQuery
У меня есть файл CSV в хранилище, и я хочу прочитать его и записать в BigQuery Table. это мой CSV-файл, где первая строка - заголовок:
GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114
это мой код:
public class StorgeBq {
public static class StringToRowConverter extends DoFn<String, TableRow> {
private String[] columnNames;
private boolean isFirstRow = true;
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
String[] parts = c.element().split(",");
if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setZone("europe-west1-c");
options.setProject("mydata-dev");
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
.apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
.apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
.to("mydata-dev:DF_TEST.dataflow_table")
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
p.run().waitUntilFinish();
}
}
Есть некоторые проблемы: 1) когда работа начинает выполняться, я вижу, что есть процесс под названием "DropInputs", который я не определил в своем коде!! и начинает работать перед всеми задачами, почему??
2) Почему трубопровод не начинается с первой задачи "ReadLines"? 3) В файле журнала я вижу, что в задаче "WriteToBq" он пытается найти одно из данных в качестве поля, например "Учителя 1-го класса" - это не поле, а данные для "GroupName":
"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",
1 ответ
У вас есть пара проблем в вашем коде. Но, в первую очередь, касаемо этапа "DropInputs" - его можно смело игнорировать. Это был результат этого сообщения об ошибке. Я до сих пор не понимаю, почему он должен отображаться (это сбивает с толку многих наших пользователей), и я хотел бы, чтобы Гуглер вмешался в это. На мой взгляд, это просто беспорядок.
Хорошо, к вашему коду сейчас:
- Вы предполагаете, что первая прочитанная строка будет вашим заголовком. Это неверное предположение. Поток данных читает параллельно, поэтому строка заголовка может появиться в любое время. Вместо использования
boolean
флаг, чтобы проверить, проверитьstring
цените себя каждый раз в вашемParDo
напримерif (c.element.contains("GroupName") then..
- Вам не хватает схемы таблицы BigQuery. Вам нужно добавить
withSchema(..)
к вашей раковине BigQuery. Вот пример из одного из моих публичных конвейеров.