Остановка вызываемого в MvcAsyncTask
У меня есть контроллер с WebAsyncTask. Далее я использую обратный вызов тайм-аута. Как написано здесь, у меня будет возможность уведомить вызываемый объект об отмене обработки. Однако я не вижу никакой возможности сделать это.
@Controller
public class UserDataProviderController {
private static final Logger log = LoggerFactory.getLogger(UserDataProviderController.class.getName());
@Autowired
private Collection<UserDataService> dataServices;
@RequestMapping(value = "/client/{socialSecurityNumber}", method = RequestMethod.GET)
public @ResponseBody
WebAsyncTask<ResponseEntity<CustomDataResponse>> process(@PathVariable final String socialSecurityNumber) {
final Callable<ResponseEntity<CustomDataResponse>> callable = new Callable<ResponseEntity<CustomDataResponse>>() {
@Override
public ResponseEntity<CustomDataResponse> call() throws Exception {
CustomDataResponse CustomDataResponse = CustomDataResponse.newInstance();
// Find user data
for(UserDataService dataService:dataServices)
{
List<? extends DataClient> clients = dataService.findBySsn(socialSecurityNumber);
CustomDataResponse.put(dataService.getDataSource(), UserDataConverter.convert(clients));
}
// test long execution
Thread.sleep(4000);
log.info("Execution thread continued and shall be terminated:"+Thread.currentThread().getName());
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
return new ResponseEntity(CustomDataResponse,responseHeaders,HttpStatus.OK);
}
};
final Callable<ResponseEntity<CustomDataResponse>> callableTimeout = new Callable<ResponseEntity<CustomDataResponse>>() {
@Override
public ResponseEntity<CustomDataResponse> call() throws Exception {
// Error response
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
return new ResponseEntity("Request has timed out!",responseHeaders,HttpStatus.INTERNAL_SERVER_ERROR);
}
};
WebAsyncTask<ResponseEntity<CustomDataResponse>> task = new WebAsyncTask<>(3000,callable);
task.onTimeout(callableTimeout);
return task;
}
}
Мой @WebConfig
@Configuration
@EnableWebMvc
class WebAppConfig extends WebMvcConfigurerAdapter {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setKeepAliveSeconds(60 * 60);
executor.afterPropertiesSet();
configurer.registerCallableInterceptors(new TimeoutCallableProcessingInterceptor());
configurer.setTaskExecutor(executor);
}
}
И вполне стандартный Interceptor:
public class TimeoutCallableProcessingInterceptor extends CallableProcessingInterceptorAdapter {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) {
throw new IllegalStateException("[" + task.getClass().getName() + "] timed out");
}
}
Все работает как надо, но Callable из контроллера всегда завершается, что очевидно, но как остановить там обработку?
1 ответ
Ты можешь использовать WebAsyncTask
реализовать контроль времени ожидания и Thread
управление, чтобы остановить новый асинхронный поток изящно.
- Реализовать
Callable
запустить процесс - В этом методе (который выполняется в другом потоке) сохраните текущий
Thread
в локальной переменной контроллера - Реализовать другое
Callable
обрабатывать событие тайм-аута - В этом методе восстановить ранее сохраненные
Thread
и прервите его, вызываяinterrupt()
метод. - Также бросьте
TimeoutException
остановить процесс контроллера - В процессе выполнения проверьте, не прерывается ли поток
Thread.currentThread().isInterrupted()
, если это так, то откат транзакции с выдачей исключения.
контроллер:
public WebAsyncTask<ResponseEntity<BookingFileDTO>> confirm(@RequestBody final BookingConfirmationRQDTO bookingConfirmationRQDTO)
throws AppException,
ProductException,
ConfirmationException,
BeanValidationException {
final Long startTimestamp = System.currentTimeMillis();
// The compiler obligates to define the local variable shared with the callable as final array
final Thread[] asyncTaskThread = new Thread[1];
/**
* Asynchronous execution of the service's task
* Implemented without ThreadPool, we're using Tomcat's ThreadPool
* To implement an specific ThreadPool take a look at http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#mvc-ann-async-configuration-spring-mvc
*/
Callable<ResponseEntity<BookingFileDTO>> callableTask = () -> {
//Stores the thread of the newly started asynchronous task
asyncTaskThread[0] = Thread.currentThread();
log.debug("Running saveBookingFile task at `{}`thread", asyncTaskThread[0].getName());
BookingFileDTO bookingFileDTO = bookingFileService.saveBookingFile(
bookingConfirmationRQDTO,
MDC.get(HttpHeader.XB3_TRACE_ID))
.getValue();
if (log.isDebugEnabled()) {
log.debug("The saveBookingFile task took {} ms",
System.currentTimeMillis() - startTimestamp);
}
return new ResponseEntity<>(bookingFileDTO, HttpStatus.OK);
};
/**
* This method is executed if a timeout occurs
*/
Callable<ResponseEntity<BookingFileDTO>> callableTimeout = () -> {
String msg = String.format("Timeout detected at %d ms during confirm operation",
System.currentTimeMillis() - startTimestamp);
log.error("Timeout detected at {} ms during confirm operation: informing BookingFileService.", msg);
// Informs the service that the time has ran out
asyncTaskThread[0].interrupt();
// Interrupts the controller call
throw new TimeoutException(msg);
};
WebAsyncTask<ResponseEntity<BookingFileDTO>> webAsyncTask = new WebAsyncTask<>(timeoutMillis, callableTask);
webAsyncTask.onTimeout(callableTimeout);
log.debug("Timeout set to {} ms", timeoutMillis);
return webAsyncTask;
}
Реализация сервиса:
/**
* If the service has been informed that the time has ran out
* throws an AsyncRequestTimeoutException to roll-back transactions
*/
private void rollbackOnTimeout() throws TimeoutException {
if(Thread.currentThread().isInterrupted()) {
log.error(TIMEOUT_DETECTED_MSG);
throw new TimeoutException(TIMEOUT_DETECTED_MSG);
}
}
@Transactional(rollbackFor = TimeoutException.class, propagation = Propagation.REQUIRES_NEW)
DTOSimpleWrapper<BookingFileDTO> saveBookingFile(BookingConfirmationRQDTO bookingConfirmationRQDTO, String traceId) {
// Database operations
// ...
return retValue;
}