Как Spring Batch CompositeItemWriter управляет транзакциями для авторов делегатов?

В конфигурации шага пакетного задания я планирую выполнить 2 запроса в модуле записи, 1-й запрос - обновить записи в таблице A, а 2-й запрос - снова вставить новые записи в таблицу A.

До сих пор я думаю, что CompositeItemWriter может достичь моей цели выше, то есть мне нужно создать 2 JdbcBatchItemWriters, один для обновления, а другой для вставки.

Мой первый вопрос: подходит ли CompositeItemWriter к вышеуказанным требованиям?

Если да, то это приводит ко второму вопросу о транзакции. Например, если первое обновление прошло успешно, а вторая вставка завершилась неудачно. Будет ли автоматически отменена 1-я транзакция обновления? Иначе как вручную вытащить оба обновления в одной транзакции?

Заранее спасибо!

1 ответ

Решение

Мой первый вопрос: подходит ли CompositeItemWriter к вышеуказанным требованиям?

Да, CompositeItemWriter это путь

Если да, то это приводит ко второму вопросу о транзакции. Например, если первое обновление прошло успешно, а вторая вставка завершилась неудачно. Будет ли автоматически отменена 1-я транзакция обновления? Иначе как вручную вытащить оба обновления в одной транзакции?

Отличный вопрос! Да, если обновление завершается успешно в первом модуле записи, а затем во втором модуле записи происходит сбой, все операторы будут откатываться автоматически. Что вам нужно знать, так это то, что транзакция заключается в выполнении шага тасклета, ориентированного на чанк (и так далее write метод составного элемента записи). Следовательно, выполнение всех SQL-операторов в этом методе (выполняемых в средствах записи делегатов) будет атомарным.

Чтобы проиллюстрировать этот вариант использования, я написал следующий тест:

  • Учитывая стол people с двумя колоннами id а также name только с одной записью внутри: 1,'foo'
  • Представим себе работу, которая читает две записи (1,'foo', 2,'bar') и пытается обновить foo в foo!! а затем вставляет 2,'bar' в таблице. Это сделано с CompositeItemWriter с двумя авторами предметов: UpdateItemWriter а также InsertItemWriter
  • Случай использования заключается в том, что UpdateItemWriter успешно, но InsertItemWriter не удается (выбрасывая исключение)
  • Ожидаемый результат заключается в том, что foo не обновляется до foo!! а также bar не вставляется в таблицу (оба SQL-оператора откатываются из-за исключения в InsertItemWriter)

Вот код (он самодостаточен, так что вы можете попробовать его и посмотреть, как все работает, он использует встроенную базу данных hsqldb, которая должна быть в вашем пути к классам):

import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
        jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
    }

    @Test
    public void testTransactionRollbackWithCompositeWriter() throws Exception {
        // given
        int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount);
        Assert.assertEquals(1, fooCount);
        Assert.assertEquals(0, barCount);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();

        // then
        Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
        Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        Assert.assertEquals(0, stepExecution.getCommitCount());
        Assert.assertEquals(1, stepExecution.getRollbackCount());
        Assert.assertEquals(0, stepExecution.getWriteCount());

        peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount); // bar is not inserted
        Assert.assertEquals(0, barCount); // bar is not inserted
        Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.HSQL)
                    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                    .build();
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public ItemReader<Person> itemReader() {
            Person foo = new Person(1, "foo");
            Person bar = new Person(2, "bar");
            return new ListItemReader<>(Arrays.asList(foo, bar));
        }

        @Bean
        public ItemWriter<Person> updateItemWriter() {
            return new UpdateItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> insertItemWriter() {
            return new InsertItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> itemWriter() {
            CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
            return compositeItemWriter;
        }

        @Bean
        public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            return jobBuilderFactory.get("job")
                    .start(stepBuilderFactory
                            .get("step").<Person, Person>chunk(2)
                            .reader(itemReader())
                            .writer(itemWriter())
                            .build())
                    .build();
        }

        @Bean
        public JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils();
        }
    }

    public static class UpdateItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public UpdateItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("foo".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
                }
            }
        }
    }

    public static class InsertItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public InsertItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("bar".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
                    throw new IllegalStateException("Something went wrong!");
                }
            }
        }
    }

    public static class Person {

        private long id;

        private String name;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

В моем примере используются пользовательские элементы записи, но это должно работать с двумя JdbcBatchItemWriterтакже.

Надеюсь, это поможет!

Другие вопросы по тегам