Почему исключение в Spring Batch AsycItemProcessor обнаружено методом onSkipInWrite SkipListener?
Я пишу приложение Spring Boot, которое запускается, собирает и преобразует миллионы записей базы данных в новый оптимизированный формат JSON, а затем отправляет их все в тему GCP PubSub. Я пытаюсь использовать для этого Spring Batch, но у меня возникают проблемы с реализацией отказоустойчивости для моего процесса. База данных изобилует проблемами качества данных, и иногда мои преобразования в JSON не работают. При возникновении сбоев я не хочу, чтобы задание немедленно завершалось, я хочу, чтобы оно продолжало обрабатывать как можно больше записей и перед завершением сообщать, какие именно записи завершились ошибкой, чтобы я или моя команда могли их изучить. проблемные записи в базе данных.
Для этого я попытался использовать интерфейс SkipListener Spring Batch. Но я также использую в своем процессе AsyncItemProcessor и AsyncItemWriter, и хотя исключения возникают во время обработки, SkipListeneronSkipInWrite()
ловит их, а не onSkipInProcess()
метод. И, к сожалению,onSkipInWrite()
метод не имеет доступа к исходному объекту базы данных, поэтому я не могу сохранить его идентификатор в моем списке проблемных записей БД.
Я что-то неправильно сконфигурировал? Есть ли другой способ получить доступ к объектам от считывателя, который не прошел этап обработки AsynItemProcessor?
Вот что я пробовал...
У меня есть одноэлементный компонент Spring, в котором я храню, сколько записей БД я успешно обработал, а также до 20 проблемных записей в базе данных.
@Component
@Getter //lombok
public class ProcessStatus {
private int processed;
private int failureCount;
private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();
public void incrementProgress { processed++; }
public void logUnexpectedFailure(UnexpectedFailure failure) {
failureCount++;
unexpectedFailure.add(failure);
}
@Getter
@AllArgsConstructor
public static class UnexpectedFailure {
private Throwable error;
private DBProjection dbData;
}
}
У меня есть пакет Spring Skip Listener, который должен обнаруживать сбои и соответствующим образом обновлять мой компонент состояния:
@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
private ProcessStatus processStatus;
@Override
public void onSkipInRead(Throwable error) {}
@Override
public void onSkipInProcess(DBProjection dbData, Throwable error) {
processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
}
@Override
public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
//This is getting called instead!! Even though the exception happened during processing :(
//But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
}
}
А потом я настроил свою работу так:
@Configuration
public class ConversionBatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TaskExecutor processThreadPool;
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
// myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
//Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
asyncDataConverter.setDelegate(dataConverter);
asyncDataConverter.setTaskExecutor(processThreadPool);
asyncDataConverter.afterPropertiesSet();
return asyncDataConverter;
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
@StepScope
public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
asyncJsonPublisher.setDelegate(jsonPublisher);
asyncJsonPublisher.afterPropertiesSet();
return asyncJsonPublisher;
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
AsyncItemWriter<JsonMessage> asyncJsonPublisher,
ProcessStatus processStatus,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
.reader(dbReader)
.processor(asyncDataConverter)
.writer(asyncJsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new ConversionSkipListener(processStatus))
.build();
}
@Bean
public Job conversionJob(Step conversionProcess) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
1 ответ
Это потому, что будущее, окутанное AsyncItemProcessor
разворачивается только в AsyncItemWriter
, поэтому любое исключение, которое может произойти в это время, рассматривается как исключение записи, а не исключение обработки. Вот почемуonSkipInWrite
называется вместо onSkipInProcess
.
На самом деле это известное ограничение этого шаблона, которое задокументировано в Javadoc для AsyncItemProcessor, вот выдержка:
Because the Future is typically unwrapped in the ItemWriter,
there are lifecycle and stats limitations (since the framework doesn't know
what the result of the processor is).
While not an exhaustive list, things like StepExecution.filterCount will not
reflect the number of filtered items and
itemProcessListener.onProcessError(Object, Exception) will not be called.
Javadoc заявляет, что список не является исчерпывающим, и побочный эффект в отношении SkipListener
вы испытываете одно из этих ограничений.