Spring Integration Flow с Jdbc источником сообщений с динамическим запросом

Я пытаюсь сделать сбор данных об изменениях из базы данных oracle, используя поток данных весеннего облака с kafka в качестве брокера. Я использую механизм опроса для этого. Я опрашиваю базу данных с помощью основного запроса на выборку через регулярные промежутки времени для сбора любых обновленных данных. Для лучшей системы защиты от сбоев я сохранил время последнего опроса в oracle DB и использовал его для получения данных, которые обновляются после последнего опроса.

public MessageSource<Object> jdbcMessageSource() {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
    jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
    return jdbcPollingChannelAdapter;
}

@Bean
public IntegrationFlow pollingFlow() {
    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
    flowBuilder.channel(this.source.output());
    flowBuilder.transform(trans,"transform");
    return flowBuilder.get();

}

Мои запросы в свойствах приложения следующие:

query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)

update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP

Это работает идеально для меня. Я могу получить CDC из БД с таким подходом.

Проблема, которую я сейчас рассматриваю, ниже:

Создание таблицы только для поддержания времени опроса является чрезмерной нагрузкой. Я ищу ведение этого последнего опроса в теме кафки и извлекать это время из темы кафки, когда я делаю следующий опрос.

Я изменил jdbcMessageSource метод, как показано ниже, чтобы попробовать это:

public MessageSource<Object> jdbcMessageSource() {
    String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";

    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, query);
    return jdbcPollingChannelAdapter;
}

Но Spring Data Flow создает экземпляр компонента pollingFlow() (см. Код выше) только один раз. Следовательно, любой запрос, который выполняется первым, останется прежним. Я хочу обновить запрос с новым временем опроса для каждого опроса.

Есть ли способ, где я могу написать кастом Integrationflow обновлять этот запрос каждый раз, когда я делаю опрос?

Я попробовал IntegrationFlowContext для этого, но не удалось.

Заранее спасибо!!!

3 ответа

Решение

См. Ответ Артема о механизме динамического запроса в стандартном адаптере; альтернативой, однако, было бы просто обернуть JdbcTemplate в бобе и вызвать его с

IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
    ...

или даже простая лямбда

    .from(() -> jdbcTemplate...)

С помощью обоих ответов выше, я смог выяснить подход. Напиши jdbc template и обернуть это как боб и использовать его для Integration Flow,

@EnableBinding(Source.class)
@AllArgsConstructor
public class StockSource {

  private DataSource dataSource;

  @Autowired
  private JdbcTemplate jdbcTemplate;

  private MessageChannelFactory messageChannelFactory;  // You can use normal message channel which is available in spring cloud data flow as well.

  private List<String> findAll() {
    jdbcTemplate = new JdbcTemplate(dataSource);
    String time = "10/24/60" . (this means 10 seconds for oracle DB)
    String query = << your query here like.. select * from test where (last_updated_time > time) >>;
    return jdbcTemplate.query(query, new RowMapper<String>() {
      @Override
      public String mapRow(ResultSet rs, int rowNum) throws SQLException {
          ...
          ...
          any row mapper operations that you want to do with you result after the poll.
          ...
          ...
          ...
        // Change the time here for the next poll to the DB. 
        return result;
      }
    });
  }

  @Bean
  public IntegrationFlow supplyPollingFlow() {

    IntegrationFlowBuilder flowBuilder = IntegrationFlows
        .from(this::findAll, spec -> {
          spec.poller(Pollers.fixedDelay(5000));
        });
    flowBuilder.channel(<<Your message channel>>);
    return flowBuilder.get();
  }

}

В нашем случае мы сохраняли время последнего опроса в теме кафки. Это должно было сделать состояние приложения меньше. Каждый новый опрос в БД теперь будет иметь новое время в where состояние.

PS: ваш брокер по обмену сообщениями (kafka/rabbit mq) должен работать в вашей локальной сети или подключаться к ним, если он размещен на другой платформе.

Боже, скорость!

У нас есть эта тестовая конфигурация (извините, это XML):

<inbound-channel-adapter query="select * from item where status=:status" channel="target"
                             data-source="dataSource" select-sql-parameter-source="parameterSource"
                             update="delete from item"/>


    <beans:bean id="parameterSource" factory-bean="parameterSourceFactory"
                factory-method="createParameterSourceNoCache">
        <beans:constructor-arg value=""/>
    </beans:bean>

    <beans:bean id="parameterSourceFactory"
                class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
        <beans:property name="parameterExpressions">
            <beans:map>
                <beans:entry key="status" value="@statusBean.which()"/>
            </beans:map>
        </beans:property>
        <beans:property name="sqlParameterTypes">
            <beans:map>
                <beans:entry key="status" value="#{ T(java.sql.Types).INTEGER}"/>
            </beans:map>
        </beans:property>
    </beans:bean>

    <beans:bean id="statusBean"
                class="org.springframework.integration.jdbc.config.JdbcPollingChannelAdapterParserTests$Status"/>

Обратите внимание на ExpressionEvaluatingSqlParameterSourceFactory И его createParameterSourceNoCache() завод. Этот результат может быть использован для select-sql-parameter-source,

JdbcPollingChannelAdapter имеет setSelectSqlParameterSource по вопросу.

Итак, вы настраиваете ExpressionEvaluatingSqlParameterSourceFactory иметь возможность разрешить некоторый параметр запроса как выражение для вызова некоторого метода бобов, чтобы получить желаемое значение от Kafka. затем createParameterSourceNoCache() поможет вам получить ожидаемый SqlParameterSource,

В документации также есть некоторая информация: https://docs.spring.io/spring-integration/docs/current/reference/html/

Другие вопросы по тегам