Загрузка файла в БД с помощью Apache Beam
I have scenario as below:
I need to load file into Database. But before that I have to verify data is present into Database based on file data. Now suppose I have 5 records in a file then I have to check 5 times into database for separate records. How I can take value dynamically: We have to pass dynamic value instead of 2 in line (preparedStatement.setString(1, "2");)
Здесь мы создаем конвейер потока данных, который загружает данные в базу данных, используя луч apache. Теперь мы создаем объект конвейера и создаем конвейер. используя pcollection, мы сохраняем в базе данных.
Pipeline p = Pipeline.create(options);
p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new FilterHeaderFn(csvHeader)))
.apply(ParDo.of(new GetRatePlanID()))
.apply("Format Result", MapElements.into(TypeDescriptors.strings()).via((KV<String, Integer> ABC) ->
ABC.getKey() +","+ +ABC.getValue()))
.apply("Write File", TextIO.write().to(options.getOutputFile()).withoutSharding());
// Retrieving data from database
PCollection<String> data =
p.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/XYZ")
.withUsername("root")
.withPassword("root1234"))
.withQuery("select * from xyz where z = ?")
.withCoder(StringUtf8Coder.of())
.withStatementPreparator(new JdbcIO.StatementPreparator() {
private static final long serialVersionUID = 1L;
@Override
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "2");
}
})
.withRowMapper(new JdbcIO.RowMapper<String>() {
private static final long serialVersionUID = 1L;
public String mapRow(ResultSet resultSet) throws Exception {
return "Symbol: "+resultSet.getInt(1)+"\nPrice: "+resultSet.getString(2)+
"\nCompany: "+resultSet.getInt(3);
}
}));
1 ответ
Как было предложено, наиболее эффективным, вероятно, было бы загрузить весь файл во временную таблицу, а затем выполнить запрос для обновления необходимых строк.
Если это невозможно, вы можете вместо этого прочитать таблицу в Dataflow (т.е.
"select * from xyz"
), а затем выполните соединение /CoGroupByKey, чтобы сопоставить записи с записями, найденными в вашем файле. Если вы ожидаете, что существующая база данных будет очень большой по сравнению с файлами, которые вы надеетесь загрузить в нее, у вас может быть DoFn, который выполняет запросы к вашей базе данных напрямую с использованием JDBC (возможно, кэшируя соединение в методе DoFn setUp), а не используя JdbcIO.