Spring batch jpaPagingItemReader, почему некоторые строки не читаются?
Я использую Spring Batch(3.0.1.RELEASE) / JPA и базу данных сервера HSQLBD. Мне нужно просмотреть всю таблицу (используя пейджинг) и обновить элементы (один за другим). Поэтому я использовал jpaPagingItemReader. Но когда я запускаю задание, я вижу, что некоторые строки пропущены, а количество пропущенных строк равно размеру страницы. Например, если в моей таблице 12 строк и jpaPagingItemReader.pagesize = 3, задание будет читать: строки 1,2,3, затем строки 7,8,9 (поэтому пропустите строки 4,5,6)… Не могли бы вы сказать, что неправильно в моем коде / конфигурации, или, возможно, это проблема с подкачкой HSQLDB? Ниже мой код:
[РЕДАКТИРОВАТЬ]: Проблема в моем ItemProcessor, который выполняет модификацию объектов POJO. Так как JPAPagingItemReader сделал сброс между каждым чтением, сущности обновляются ((это то, что я хочу). Но кажется, что подкачка курсора также увеличивается (как можно видеть в журнале: идентификаторы строк 4, 5 и 6 были пропущено). Как я могу решить эту проблему?
@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
@Inject
private InfrastructureConfiguration infrastructureConfiguration;
@Inject private JobBuilderFactory jobs;
@Inject private StepBuilderFactory steps;
@Bean public Job job() {
return jobs.get("Myjob1").start(step1()).build();
}
@Bean public Step step1() {
return steps.get("step1")
.<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
.reader(reader()).processor(processor()).build();
}
@Bean(destroyMethod = "")
@JobScope
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
JpaPagingItemReader reader = new JpaPagingItemReader();
reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
reader.setPageSize(3));
return reader;
}
@Bean @JobScope
public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {
return new MyItemProcessor();
}
}
@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
@Inject private EntityManagerFactory emf;
@Override
public EntityManagerFactory getEntityManagerFactory() {
return emf;
}
}
от моего ItemProcessor:
@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
//do some stuff …
//then if (condition) update the Entity pojo :
item.setModificationDate(new Timestamp(System.currentTimeMillis());
item.setIsactive = false;
}
из конфигурационного файла Spring xml:
<tx:annotation-driven transaction-manager="transactionManager" />
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
<property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>
<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.hsqldb.jdbcDriver" />
<property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
<property name="username" value="sa" />
<property name="password" value="" />
</bean>
трассировка / журнал обобщен:
11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]
11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN set ...etc...
11:16:06.681 DEBUG SQL- select ...etc... from SNUSER_CAMPAIGN snuserperc0_
11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]
4 ответа
org.springframework.batch.item.database.JpaPagingItemReader создает собственный экземпляр entityManager.
(из org.springframework.batch.item.database.JpaPagingItemReader#doOpen):
entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
Если вы находитесь в транзакции, как кажется, сущности читателя не отсоединяются (от org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):
if (!transacted) {
List<T> queryResult = query.getResultList();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
}//end if
} else {
results.addAll(query.getResultList());
tx.commit();
}
По этой причине, когда вы обновляете элемент в процессоре или записывающем устройстве, этот элемент по-прежнему управляется считывателем entityManager.
Когда читатель элемента читает следующую порцию данных, он сбрасывает контекст в базу данных.
Итак, если мы посмотрим на ваш случай, после первой части процессов обработки данных, мы имеем в базе данных:
|id|active
|1 | false
|2 | false
|3 | false
org.springframework.batch.item.database.JpaPagingItemReader использует limit & offset для извлечения разбитых на страницы данных. Итак, следующий выбор, созданный читателем, выглядит так:
select * from table where active = true offset 3 limits 3.
Читатель пропустит элементы с идентификатором 4,5,6, потому что теперь они являются первыми строками, полученными из базы данных.
В качестве обходного пути вы можете использовать реализацию jdbc (org.springframework.batch.item.database.JdbcPagingItemReader), так как она не использует limit & offset. Он основан на отсортированном столбце (обычно столбце идентификатора), поэтому вы не пропустите никаких данных. Конечно, вам придется обновить данные в пишущем устройстве (используя либо JPA, либо чистую реализацию JDBC)
Читатель будет более многословным:
@Bean
public ItemReader<? extends Entity> reader() {
JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
sqlPagingQueryProviderFactoryBean.setSortKey("id");
try {
reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
} catch (Exception e) {
e.printStackTrace();
}
reader.setDataSource(dataSource);
reader.setPageSize(3);
reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
return reader;
Я столкнулся с тем же случаем, мой читатель был JpaPagingItemReader, который запрашивал поле, которое было обновлено в модуле записи. Следовательно, пропуская половину элементов, которые необходимо было обновить, из-за того, что окно страницы прогрессировало, а уже прочитанные элементы больше не были доступны читателю.
Самым простым обходным путем для меня было переопределить метод getPage в JpaPagingItemReader, чтобы всегда возвращать первую страницу.
JpaPagingItemReader<XXXXX> jpaPagingItemReader = new JpaPagingItemReader() {
@Override
public int getPage() {
return 0;
}
};
Несколько вещей, чтобы отметить:
- Все объекты, которые возвращаются из
JpaPaginingItemReader
отделены. Мы осуществляем это одним из двух способов. Мы либо создаем транзакцию перед запросом страницы, затем фиксируем транзакцию (которая отсоединяет все сущности, связанные сEntityManager
для этой транзакции) или мы явно называемentityManager.detach
, Мы делаем это так, чтобы такие функции, как повтор и пропуск могли быть выполнены правильно. - Хотя вы не разместили весь код в своем процессоре, я думаю, что в
//do some stuff
раздел, ваш элемент повторно прикрепляется, поэтому происходит обновление. Однако, не имея возможности увидеть этот код, я не могу быть уверен. - В любом случае, используя явный
ItemWriter
должно быть сделано. На самом деле, я считаю ошибкой то, что нам не требуетсяItemWriter
при использовании конфигурации Java (мы делаем для XML). - Для вашей конкретной проблемы пропущенных записей, вы должны иметь в виду, что курсор не используется ни одним из
*PagingItemReader
s. Все они выполняют независимые запросы для каждой страницы данных. Таким образом, если вы обновите базовые данные между каждой страницей, это может повлиять на элементы, возвращаемые на будущих страницах. Например, если в моем поисковом запросе указаноwhere val1 > 4
и у меня есть запись, что val1 был от 1 до 5, в блоке 2 этот элемент может быть возвращен, поскольку теперь он соответствует критериям. Если вам нужно обновить значения, которые содержатся в предложении where (что влияет на то, что попадает в набор данных, которые вы будете обрабатывать), лучше всего добавить какой-либо обработанный флаг, по которому вы можете запросить вместо этого.
У меня была та же проблема с пропускаемыми строками в зависимости от размера страницы. Например, если у меня установлено значение pageSize, равное 2, оно будет читать 2, игнорировать 2, читать 2, игнорировать 2 и т. Д.
Я создавал процессор демона для опроса таблицы базы данных "Запрос" на наличие записей в состоянии "Ожидание обработки". Демон предназначен для вечной работы в фоновом режиме.
У меня было поле "status", которое было определено в @NamedQuery, и я выбирал записи со статусом "10": ожидание обработки. После обработки записи поле состояния будет изменено на "20": ошибка или "30": успешно. Это оказалось причиной проблемы - я обновлял поле, которое было определено в запросе. Если бы я ввел "обработанное поле" и обновил его вместо поля "статус", то проблем не было - все записи были бы прочитаны.
В качестве возможного решения для обновления поля состояния я устанавливаю значение MaxItemCount таким же, как PageSize; это обновило записи правильно до завершения шага. Затем я продолжаю выполнять этот шаг, пока не будет сделан запрос на остановку демона. Хорошо, вероятно, не самый эффективный способ сделать это (но я все еще извлекаю выгоду из простоты использования, которую обеспечивает JPA), но я думаю, что было бы, вероятно, лучше использовать JdbcPagingItemReader (описанный выше - спасибо!). Мнения о наилучшем подходе к этой проблеме пакетного опроса базы данных приветствуются:)