Как Spring Batch CompositeItemWriter управляет транзакциями для авторов делегатов?
В конфигурации шага пакетного задания я планирую выполнить 2 запроса в модуле записи, 1-й запрос - обновить записи в таблице A, а 2-й запрос - снова вставить новые записи в таблицу A.
До сих пор я думаю, что CompositeItemWriter может достичь моей цели выше, то есть мне нужно создать 2 JdbcBatchItemWriters, один для обновления, а другой для вставки.
Мой первый вопрос: подходит ли CompositeItemWriter к вышеуказанным требованиям?
Если да, то это приводит ко второму вопросу о транзакции. Например, если первое обновление прошло успешно, а вторая вставка завершилась неудачно. Будет ли автоматически отменена 1-я транзакция обновления? Иначе как вручную вытащить оба обновления в одной транзакции?
Заранее спасибо!
1 ответ
Мой первый вопрос: подходит ли CompositeItemWriter к вышеуказанным требованиям?
Да, CompositeItemWriter
это путь
Если да, то это приводит ко второму вопросу о транзакции. Например, если первое обновление прошло успешно, а вторая вставка завершилась неудачно. Будет ли автоматически отменена 1-я транзакция обновления? Иначе как вручную вытащить оба обновления в одной транзакции?
Отличный вопрос! Да, если обновление завершается успешно в первом модуле записи, а затем во втором модуле записи происходит сбой, все операторы будут откатываться автоматически. Что вам нужно знать, так это то, что транзакция заключается в выполнении шага тасклета, ориентированного на чанк (и так далее write
метод составного элемента записи). Следовательно, выполнение всех SQL-операторов в этом методе (выполняемых в средствах записи делегатов) будет атомарным.
Чтобы проиллюстрировать этот вариант использования, я написал следующий тест:
- Учитывая стол
people
с двумя колоннамиid
а такжеname
только с одной записью внутри:1,'foo'
- Представим себе работу, которая читает две записи (
1,'foo'
,2,'bar'
) и пытается обновитьfoo
вfoo!!
а затем вставляет2,'bar'
в таблице. Это сделано сCompositeItemWriter
с двумя авторами предметов:UpdateItemWriter
а такжеInsertItemWriter
- Случай использования заключается в том, что
UpdateItemWriter
успешно, ноInsertItemWriter
не удается (выбрасывая исключение) - Ожидаемый результат заключается в том, что
foo
не обновляется доfoo!!
а такжеbar
не вставляется в таблицу (оба SQL-оператора откатываются из-за исключения вInsertItemWriter
)
Вот код (он самодостаточен, так что вы можете попробовать его и посмотреть, как все работает, он использует встроенную базу данных hsqldb, которая должна быть в вашем пути к классам):
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void setUp() {
jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
}
@Test
public void testTransactionRollbackWithCompositeWriter() throws Exception {
// given
int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount);
Assert.assertEquals(1, fooCount);
Assert.assertEquals(0, barCount);
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// then
Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
Assert.assertEquals(0, stepExecution.getCommitCount());
Assert.assertEquals(1, stepExecution.getRollbackCount());
Assert.assertEquals(0, stepExecution.getWriteCount());
peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount); // bar is not inserted
Assert.assertEquals(0, barCount); // bar is not inserted
Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<Person> itemReader() {
Person foo = new Person(1, "foo");
Person bar = new Person(2, "bar");
return new ListItemReader<>(Arrays.asList(foo, bar));
}
@Bean
public ItemWriter<Person> updateItemWriter() {
return new UpdateItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> insertItemWriter() {
return new InsertItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> itemWriter() {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
return compositeItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory
.get("step").<Person, Person>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
}
public static class UpdateItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public UpdateItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("foo".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
}
}
}
}
public static class InsertItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public InsertItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("bar".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
throw new IllegalStateException("Something went wrong!");
}
}
}
}
public static class Person {
private long id;
private String name;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
В моем примере используются пользовательские элементы записи, но это должно работать с двумя JdbcBatchItemWriter
также.
Надеюсь, это поможет!