Загрузка файла в БД с помощью Apache Beam

      I have scenario as below: 

I need to load file into Database. But before that I have to verify data is present into Database based on file data. Now suppose I have 5 records in a file then I have to check 5 times into database for separate records. How I can take value dynamically: We have to pass dynamic value instead of 2 in line (preparedStatement.setString(1, "2");)

Здесь мы создаем конвейер потока данных, который загружает данные в базу данных, используя луч apache. Теперь мы создаем объект конвейера и создаем конвейер. используя pcollection, мы сохраняем в базе данных.

       Pipeline p = Pipeline.create(options);
           p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
            .apply(ParDo.of(new FilterHeaderFn(csvHeader)))
          .apply(ParDo.of(new GetRatePlanID()))
           .apply("Format Result", MapElements.into(TypeDescriptors.strings()).via((KV<String, Integer> ABC) ->
           ABC.getKey() +","+ +ABC.getValue()))
           .apply("Write File", TextIO.write().to(options.getOutputFile()).withoutSharding());
    
        // Retrieving data from database
        PCollection<String> data = 
                
                p.apply(JdbcIO.<String>read()
                
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                           "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/XYZ")
                           .withUsername("root")
                            .withPassword("root1234"))
            
                .withQuery("select * from xyz where z = ?")
                .withCoder(StringUtf8Coder.of())
                .withStatementPreparator(new JdbcIO.StatementPreparator() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public void setParameters(PreparedStatement preparedStatement) throws Exception {
                        preparedStatement.setString(1, "2");
                        
                    }
                })
                .withRowMapper(new JdbcIO.RowMapper<String>() {
                    private static final long serialVersionUID = 1L;
                    public String mapRow(ResultSet resultSet) throws Exception {
                    return "Symbol: "+resultSet.getInt(1)+"\nPrice: "+resultSet.getString(2)+

                                "\nCompany: "+resultSet.getInt(3);
                    }
                })); 

1 ответ

Как было предложено, наиболее эффективным, вероятно, было бы загрузить весь файл во временную таблицу, а затем выполнить запрос для обновления необходимых строк.

Если это невозможно, вы можете вместо этого прочитать таблицу в Dataflow (т.е. "select * from xyz"), а затем выполните соединение /CoGroupByKey, чтобы сопоставить записи с записями, найденными в вашем файле. Если вы ожидаете, что существующая база данных будет очень большой по сравнению с файлами, которые вы надеетесь загрузить в нее, у вас может быть DoFn, который выполняет запросы к вашей базе данных напрямую с использованием JDBC (возможно, кэшируя соединение в методе DoFn setUp), а не используя JdbcIO.

Другие вопросы по тегам