Spring batch jpaPagingItemReader, почему некоторые строки не читаются?

Я использую Spring Batch(3.0.1.RELEASE) / JPA и базу данных сервера HSQLBD. Мне нужно просмотреть всю таблицу (используя пейджинг) и обновить элементы (один за другим). Поэтому я использовал jpaPagingItemReader. Но когда я запускаю задание, я вижу, что некоторые строки пропущены, а количество пропущенных строк равно размеру страницы. Например, если в моей таблице 12 строк и jpaPagingItemReader.pagesize = 3, задание будет читать: строки 1,2,3, затем строки 7,8,9 (поэтому пропустите строки 4,5,6)… Не могли бы вы сказать, что неправильно в моем коде / конфигурации, или, возможно, это проблема с подкачкой HSQLDB? Ниже мой код:

[РЕДАКТИРОВАТЬ]: Проблема в моем ItemProcessor, который выполняет модификацию объектов POJO. Так как JPAPagingItemReader сделал сброс между каждым чтением, сущности обновляются ((это то, что я хочу). Но кажется, что подкачка курсора также увеличивается (как можно видеть в журнале: идентификаторы строк 4, 5 и 6 были пропущено). Как я могу решить эту проблему?

@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
  @Inject
  private InfrastructureConfiguration infrastructureConfiguration;  
  @Inject private JobBuilderFactory jobs;
  @Inject private StepBuilderFactory steps;

  @Bean  public Job job() {
     return jobs.get("Myjob1").start(step1()).build();
  }
  @Bean  public Step step1() {  
      return steps.get("step1")
                .<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
                .reader(reader()).processor(processor()).build();   
  }
  @Bean(destroyMethod = "")
@JobScope 
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
    JpaPagingItemReader reader = new JpaPagingItemReader();
    reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
    reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
    reader.setPageSize(3));
    return reader;
}
 @Bean @JobScope
 public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {   
     return new MyItemProcessor();
 }
}

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 @Inject private EntityManagerFactory emf;  
 @Override
public EntityManagerFactory getEntityManagerFactory() {
    return emf;
}
}  

от моего ItemProcessor:

@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
    //do some stuff …
   //then if (condition) update the Entity pojo :   
   item.setModificationDate(new Timestamp(System.currentTimeMillis());
   item.setIsactive = false;

}

из конфигурационного файла Spring xml:

<tx:annotation-driven transaction-manager="transactionManager" />     
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
    <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

трассировка / журнал обобщен:

11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]

11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...

11:16:06.681 DEBUG SQL- select ...etc... from  SNUSER_CAMPAIGN snuserperc0_ 

11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]

4 ответа

Решение

org.springframework.batch.item.database.JpaPagingItemReader создает собственный экземпляр entityManager.

(из org.springframework.batch.item.database.JpaPagingItemReader#doOpen):

entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);

Если вы находитесь в транзакции, как кажется, сущности читателя не отсоединяются (от org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

    if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }//end if
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }

По этой причине, когда вы обновляете элемент в процессоре или записывающем устройстве, этот элемент по-прежнему управляется считывателем entityManager.

Когда читатель элемента читает следующую порцию данных, он сбрасывает контекст в базу данных.

Итак, если мы посмотрим на ваш случай, после первой части процессов обработки данных, мы имеем в базе данных:

|id|active
|1 | false
|2 | false
|3 | false

org.springframework.batch.item.database.JpaPagingItemReader использует limit & offset для извлечения разбитых на страницы данных. Итак, следующий выбор, созданный читателем, выглядит так:

select * from table where active = true offset 3 limits 3. 

Читатель пропустит элементы с идентификатором 4,5,6, потому что теперь они являются первыми строками, полученными из базы данных.

В качестве обходного пути вы можете использовать реализацию jdbc (org.springframework.batch.item.database.JdbcPagingItemReader), так как она не использует limit & offset. Он основан на отсортированном столбце (обычно столбце идентификатора), поэтому вы не пропустите никаких данных. Конечно, вам придется обновить данные в пишущем устройстве (используя либо JPA, либо чистую реализацию JDBC)

Читатель будет более многословным:

@Bean
public ItemReader<? extends Entity> reader() {
    JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
    sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
    sqlPagingQueryProviderFactoryBean.setSortKey("id");
    try {
        reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource(dataSource);
    reader.setPageSize(3);
    reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
    return reader;

Я столкнулся с тем же случаем, мой читатель был JpaPagingItemReader, который запрашивал поле, которое было обновлено в модуле записи. Следовательно, пропуская половину элементов, которые необходимо было обновить, из-за того, что окно страницы прогрессировало, а уже прочитанные элементы больше не были доступны читателю.

Самым простым обходным путем для меня было переопределить метод getPage в JpaPagingItemReader, чтобы всегда возвращать первую страницу.

JpaPagingItemReader<XXXXX> jpaPagingItemReader = new JpaPagingItemReader() {
    @Override
    public int getPage() {
        return 0;
    }
};

Несколько вещей, чтобы отметить:

  1. Все объекты, которые возвращаются из JpaPaginingItemReader отделены. Мы осуществляем это одним из двух способов. Мы либо создаем транзакцию перед запросом страницы, затем фиксируем транзакцию (которая отсоединяет все сущности, связанные с EntityManager для этой транзакции) или мы явно называем entityManager.detach, Мы делаем это так, чтобы такие функции, как повтор и пропуск могли быть выполнены правильно.
  2. Хотя вы не разместили весь код в своем процессоре, я думаю, что в //do some stuff раздел, ваш элемент повторно прикрепляется, поэтому происходит обновление. Однако, не имея возможности увидеть этот код, я не могу быть уверен.
  3. В любом случае, используя явный ItemWriter должно быть сделано. На самом деле, я считаю ошибкой то, что нам не требуется ItemWriter при использовании конфигурации Java (мы делаем для XML).
  4. Для вашей конкретной проблемы пропущенных записей, вы должны иметь в виду, что курсор не используется ни одним из *PagingItemReaders. Все они выполняют независимые запросы для каждой страницы данных. Таким образом, если вы обновите базовые данные между каждой страницей, это может повлиять на элементы, возвращаемые на будущих страницах. Например, если в моем поисковом запросе указано where val1 > 4 и у меня есть запись, что val1 был от 1 до 5, в блоке 2 этот элемент может быть возвращен, поскольку теперь он соответствует критериям. Если вам нужно обновить значения, которые содержатся в предложении where (что влияет на то, что попадает в набор данных, которые вы будете обрабатывать), лучше всего добавить какой-либо обработанный флаг, по которому вы можете запросить вместо этого.

У меня была та же проблема с пропускаемыми строками в зависимости от размера страницы. Например, если у меня установлено значение pageSize, равное 2, оно будет читать 2, игнорировать 2, читать 2, игнорировать 2 и т. Д.

Я создавал процессор демона для опроса таблицы базы данных "Запрос" на наличие записей в состоянии "Ожидание обработки". Демон предназначен для вечной работы в фоновом режиме.

У меня было поле "status", которое было определено в @NamedQuery, и я выбирал записи со статусом "10": ожидание обработки. После обработки записи поле состояния будет изменено на "20": ошибка или "30": успешно. Это оказалось причиной проблемы - я обновлял поле, которое было определено в запросе. Если бы я ввел "обработанное поле" и обновил его вместо поля "статус", то проблем не было - все записи были бы прочитаны.

В качестве возможного решения для обновления поля состояния я устанавливаю значение MaxItemCount таким же, как PageSize; это обновило записи правильно до завершения шага. Затем я продолжаю выполнять этот шаг, пока не будет сделан запрос на остановку демона. Хорошо, вероятно, не самый эффективный способ сделать это (но я все еще извлекаю выгоду из простоты использования, которую обеспечивает JPA), но я думаю, что было бы, вероятно, лучше использовать JdbcPagingItemReader (описанный выше - спасибо!). Мнения о наилучшем подходе к этой проблеме пакетного опроса базы данных приветствуются:)

Другие вопросы по тегам