Остановка вызываемого в MvcAsyncTask

У меня есть контроллер с WebAsyncTask. Далее я использую обратный вызов тайм-аута. Как написано здесь, у меня будет возможность уведомить вызываемый объект об отмене обработки. Однако я не вижу никакой возможности сделать это.

@Controller
public class UserDataProviderController {

    private static final Logger log = LoggerFactory.getLogger(UserDataProviderController.class.getName());

    @Autowired
    private Collection<UserDataService> dataServices;

       @RequestMapping(value = "/client/{socialSecurityNumber}", method = RequestMethod.GET)
        public @ResponseBody
        WebAsyncTask<ResponseEntity<CustomDataResponse>> process(@PathVariable final String socialSecurityNumber) {

            final Callable<ResponseEntity<CustomDataResponse>> callable = new Callable<ResponseEntity<CustomDataResponse>>() {

                @Override
                public ResponseEntity<CustomDataResponse> call() throws Exception {

                    CustomDataResponse CustomDataResponse = CustomDataResponse.newInstance();

                    // Find user data
                    for(UserDataService dataService:dataServices)
                    {
                        List<? extends DataClient> clients = dataService.findBySsn(socialSecurityNumber);
                        CustomDataResponse.put(dataService.getDataSource(), UserDataConverter.convert(clients));
                    }

                    // test long execution
                    Thread.sleep(4000);

                    log.info("Execution thread continued and shall be terminated:"+Thread.currentThread().getName());


                    HttpHeaders responseHeaders = new HttpHeaders();
                    responseHeaders.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
                    return new ResponseEntity(CustomDataResponse,responseHeaders,HttpStatus.OK);
                }

            };

            final Callable<ResponseEntity<CustomDataResponse>> callableTimeout = new Callable<ResponseEntity<CustomDataResponse>>() {
                @Override
                public ResponseEntity<CustomDataResponse> call() throws Exception {

                    // Error response
                    HttpHeaders responseHeaders = new HttpHeaders();
                    responseHeaders.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
                    return new ResponseEntity("Request has timed out!",responseHeaders,HttpStatus.INTERNAL_SERVER_ERROR);
                }
            };

            WebAsyncTask<ResponseEntity<CustomDataResponse>> task = new WebAsyncTask<>(3000,callable);
            task.onTimeout(callableTimeout);
            return task;
        }
}

Мой @WebConfig

@Configuration
@EnableWebMvc
class WebAppConfig  extends WebMvcConfigurerAdapter {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setKeepAliveSeconds(60 * 60);
        executor.afterPropertiesSet();

        configurer.registerCallableInterceptors(new TimeoutCallableProcessingInterceptor());
        configurer.setTaskExecutor(executor);
    } 
}

И вполне стандартный Interceptor:

public class TimeoutCallableProcessingInterceptor extends CallableProcessingInterceptorAdapter {

    @Override
    public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) {

        throw new IllegalStateException("[" + task.getClass().getName() + "] timed out");

    }
}

Все работает как надо, но Callable из контроллера всегда завершается, что очевидно, но как остановить там обработку?

1 ответ

Ты можешь использовать WebAsyncTask реализовать контроль времени ожидания и Thread управление, чтобы остановить новый асинхронный поток изящно.

  1. Реализовать Callable запустить процесс
  2. В этом методе (который выполняется в другом потоке) сохраните текущий Thread в локальной переменной контроллера
  3. Реализовать другое Callable обрабатывать событие тайм-аута
  4. В этом методе восстановить ранее сохраненные Thread и прервите его, вызывая interrupt() метод.
  5. Также бросьте TimeoutException остановить процесс контроллера
  6. В процессе выполнения проверьте, не прерывается ли поток Thread.currentThread().isInterrupted(), если это так, то откат транзакции с выдачей исключения.

контроллер:

public WebAsyncTask<ResponseEntity<BookingFileDTO>> confirm(@RequestBody final BookingConfirmationRQDTO bookingConfirmationRQDTO)
        throws AppException,
        ProductException,
        ConfirmationException,
        BeanValidationException {

    final Long startTimestamp = System.currentTimeMillis();
    // The compiler obligates to define the local variable shared with the callable as final array
    final Thread[] asyncTaskThread = new Thread[1];

    /**
     *  Asynchronous execution of the service's task
     *  Implemented without ThreadPool, we're using Tomcat's ThreadPool
     *  To implement an specific ThreadPool take a look at http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#mvc-ann-async-configuration-spring-mvc
     */
    Callable<ResponseEntity<BookingFileDTO>> callableTask = () -> {

        //Stores the thread of the newly started asynchronous task
        asyncTaskThread[0] = Thread.currentThread();

        log.debug("Running saveBookingFile task at `{}`thread", asyncTaskThread[0].getName());
        BookingFileDTO bookingFileDTO = bookingFileService.saveBookingFile(
                bookingConfirmationRQDTO,
                MDC.get(HttpHeader.XB3_TRACE_ID))
                .getValue();
        if (log.isDebugEnabled()) {
            log.debug("The saveBookingFile task took {} ms",
                    System.currentTimeMillis() - startTimestamp);
        }
        return new ResponseEntity<>(bookingFileDTO, HttpStatus.OK);
    };

    /**
     * This method is executed if a timeout occurs
     */
    Callable<ResponseEntity<BookingFileDTO>> callableTimeout = () -> {

        String msg = String.format("Timeout detected at %d ms during confirm operation",
            System.currentTimeMillis() - startTimestamp);
        log.error("Timeout detected at {} ms during confirm operation: informing BookingFileService.", msg);

        // Informs the service that the time has ran out
        asyncTaskThread[0].interrupt();

        // Interrupts the controller call
        throw new TimeoutException(msg);
    };

    WebAsyncTask<ResponseEntity<BookingFileDTO>> webAsyncTask = new WebAsyncTask<>(timeoutMillis, callableTask);
    webAsyncTask.onTimeout(callableTimeout);
    log.debug("Timeout set to {} ms", timeoutMillis);
    return webAsyncTask;
}

Реализация сервиса:

/**
 * If the service has been informed that the time has ran out
 * throws an AsyncRequestTimeoutException to roll-back transactions
 */
private void rollbackOnTimeout() throws TimeoutException {
    if(Thread.currentThread().isInterrupted()) {
        log.error(TIMEOUT_DETECTED_MSG);
        throw new TimeoutException(TIMEOUT_DETECTED_MSG);
    }
}

@Transactional(rollbackFor = TimeoutException.class, propagation = Propagation.REQUIRES_NEW)
DTOSimpleWrapper<BookingFileDTO> saveBookingFile(BookingConfirmationRQDTO bookingConfirmationRQDTO, String traceId) {

    // Database operations
    // ...

    return retValue;
}
Другие вопросы по тегам