Как включить область запроса в асинхронном исполнителе задач

В моем приложении есть несколько асинхронных веб-сервисов. Сервер принимает запрос, возвращает ответ OK и начинает обработку запроса с AsyncTaskExecutor. Мой вопрос заключается в том, как включить область запроса здесь, потому что в этой обработке мне нужно получить класс, который аннотируется:

@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)

Теперь я получаю исключение:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.requestContextImpl': Scope 'request' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

потому что он работает в SimpleAsyncTaskExecutor и не в DispatcherServlet

моя асинхронная обработка запроса

taskExecutor.execute(new Runnable() {

    @Override
    public void run() {
        asyncRequest(request);
    }
});

где taskExecutor:

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

10 ответов

Решение

Мы столкнулись с той же проблемой - нужно было выполнять код в фоновом режиме, используя @Async, поэтому он не смог использовать какие-либо бины Session- или RequestScope. Мы решили это следующим образом:

  • Создайте пользовательский TaskPoolExecutor, который хранит информацию о границах задач
  • Создайте специальный Callable (или Runnable), который использует информацию, чтобы установить и очистить контекст для фонового потока
  • Создайте конфигурацию переопределения для использования пользовательского исполнителя

Примечание: это будет работать только для bean-компонентов Session и Request, но не для контекста безопасности (как в Spring Security). Вам придется использовать другой метод для установки контекста безопасности, если это то, что вам нужно.

Примечание 2: для краткости показаны только реализация Callable и submit (). Вы можете сделать то же самое для Runnable и execute ().

Вот код:

Исполнитель:

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }
}

Callable:

public class ContextAwareCallable<T> implements Callable<T> {
    private Callable<T> task;
    private RequestAttributes context;

    public ContextAwareCallable(Callable<T> task, RequestAttributes context) {
        this.task = task;
        this.context = context;
    }

    @Override
    public T call() throws Exception {
        if (context != null) {
            RequestContextHolder.setRequestAttributes(context);
        }

        try {
            return task.call();
        } finally {
            RequestContextHolder.resetRequestAttributes();
        }
    }
}

Конфигурация:

@Configuration
public class ExecutorConfig extends AsyncConfigurerSupport {
    @Override
    @Bean
    public Executor getAsyncExecutor() {
        return new ContextAwarePoolExecutor();
    }
}

Самый простой способ - использовать декоратор задач, например так:

static class ContextCopyingDecorator implements TaskDecorator {
    @Nonnull
    @Override
    public Runnable decorate(@Nonnull Runnable runnable) {
        RequestAttributes context =
                RequestContextHolder.currentRequestAttributes();
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                MDC.setContextMap(contextMap);
                runnable.run();
            } finally {
                MDC.clear();
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

Чтобы добавить этот декоратор к исполнителю задачи, все, что вам нужно, это добавить его в процедуру конфигурации:

@Override
@Bean
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor poolExecutor = new ThreadPoolTaskExecutor();
    poolExecutor.setTaskDecorator(new ContextCopyingDecorator());
    poolExecutor.initialize();
    return poolExecutor;
}

Нет необходимости в дополнительном держателе или исполнителе пользовательского пула потоков.

Упомянутые ранее решения у меня не работали. Причина, по которой решение не работает, как упоминалось в сообщении @Thilak, заключается в том, что как только исходный родительский поток передал ответ клиенту, объекты запроса могут быть удалены сборщиком мусора. Но после некоторой настройки решения, предоставленного @Armadillo, я смог заставить его работать. Я использую Spring boot 2.2

Вот что я следил.

  • Создайте настраиваемый TaskPoolExecutor, который хранит (после клонирования) ограниченную информацию с задачами.
  • Создайте специальный Callable (или Runnable), который использует клонированную информацию для установки текущих значений контекста и очистки контекста для асинхронного потока.

Исполнитель (То же, что и в сообщении @ Armadillo):

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
    }
}

Вызываемый:

public class ContextAwareCallable<T> implements Callable<T> {
  private Callable<T> task;
  private final RequestAttributes requestAttributes;

  public ContextAwareCallable(Callable<T> task, RequestAttributes requestAttributes) {
    this.task = task;
    this.requestAttributes = cloneRequestAttributes(requestAttributes);
  }

  @Override
  public T call() throws Exception {
    try {
      RequestContextHolder.setRequestAttributes(requestAttributes);
      return task.call();
    } finally {
        RequestContextHolder.resetRequestAttributes();
    }
  }

  private RequestAttributes cloneRequestAttributes(RequestAttributes requestAttributes){
    RequestAttributes clonedRequestAttribute = null;
    try{
      clonedRequestAttribute = new ServletRequestAttributes(((ServletRequestAttributes) requestAttributes).getRequest(), ((ServletRequestAttributes) requestAttributes).getResponse());
      if(requestAttributes.getAttributeNames(RequestAttributes.SCOPE_REQUEST).length>0){
        for(String name: requestAttributes.getAttributeNames(RequestAttributes.SCOPE_REQUEST)){
          clonedRequestAttribute.setAttribute(name,requestAttributes.getAttribute(name,RequestAttributes.SCOPE_REQUEST),RequestAttributes.SCOPE_REQUEST);
        }
      }
      if(requestAttributes.getAttributeNames(RequestAttributes.SCOPE_SESSION).length>0){
        for(String name: requestAttributes.getAttributeNames(RequestAttributes.SCOPE_SESSION)){
          clonedRequestAttribute.setAttribute(name,requestAttributes.getAttribute(name,RequestAttributes.SCOPE_SESSION),RequestAttributes.SCOPE_SESSION);
        }
      }
      if(requestAttributes.getAttributeNames(RequestAttributes.SCOPE_GLOBAL_SESSION).length>0){
        for(String name: requestAttributes.getAttributeNames(RequestAttributes.SCOPE_GLOBAL_SESSION)){
          clonedRequestAttribute.setAttribute(name,requestAttributes.getAttribute(name,RequestAttributes.SCOPE_GLOBAL_SESSION),RequestAttributes.SCOPE_GLOBAL_SESSION);
        }
      }
      return clonedRequestAttribute;
    }catch(Exception e){
      return requestAttributes;
    }
  }
}

Я сделал изменение, чтобы ввести cloneRequestAttributes() для копирования и установки RequestAttribute, чтобы значения оставались доступными даже после того, как исходный родительский поток передал ответ клиенту.

Конфигурация: поскольку существуют другие асинхронные конфигурации, и я не хотел, чтобы это поведение было применимо к другим асинхронным исполнителям, я создал свою собственную конфигурацию исполнителя задач.

@Configuration
@EnableAsync
public class TaskExecutorConfig {

    @Bean(name = "contextAwareTaskExecutor")
    public TaskExecutor getContextAwareTaskExecutor() {
        ContextAwarePoolExecutor taskExecutor = new ConAwarePoolExecutor();
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setThreadNamePrefix("ContextAwareExecutor-");
        return taskExecutor;
    }
}

И, наконец, в методе async я использую имя исполнителя.

    @Async("contextAwareTaskExecutor")
    public void asyncMethod() {

    }

Альтернативное решение:

Мы столкнулись с этой проблемой, пытаясь повторно использовать существующий класс компонента. Хотя решение сделало вид, что оно удобное. Это было бы гораздо проще (клонирование объектов и резервирование пула потоков), если бы мы могли указать соответствующие значения области запроса как параметры метода. В нашем случае мы планируем реорганизовать код таким образом, чтобы класс компонента, который использует bean-компонент с ограниченным объемом запроса и повторно используется из метода async, принимал значения в качестве параметров метода. Компонент с ограниченным объемом запроса удаляется из повторно используемого компонента и перемещается в класс компонента, который вызывает его метод. Чтобы выразить то, что я только что описал, в коде:

Наше текущее состояние:

@Async("contextAwareTaskExecutor")
    public void asyncMethod() {
       reUsableCompoment.executeLogic() //This component uses the request scoped bean.
    }

Отредактированный код:

    @Async("taskExecutor")
    public void asyncMethod(Object requestObject) {
       reUsableCompoment.executeLogic(requestObject); //Request scoped bean is removed from the component and moved to the component class which invokes it menthod.
    }

Невозможно получить объект области запроса в дочернем асинхронном потоке, поскольку исходный родительский поток обработки запросов, возможно, уже передал ответ клиенту, и все объекты запроса уничтожены. Одним из способов обработки таких сценариев является использование настраиваемой области видимости, например SimpleThreadScope.

Одна проблема с SimpleThreadScope заключается в том, что дочерние потоки не будут наследовать переменные области видимости родителей, поскольку он использует простой ThreadLocal для внутреннего использования. Чтобы преодолеть это, реализуйте пользовательскую область, которая в точности похожа на SimpleThreadScope, но использует InheritableThreadLocal для внутреннего использования. За дополнительной информацией обращайтесь к Spring MVC: Как использовать bean-объект в области запросов внутри порожденного потока?

Ни одно из вышеперечисленных решений не работает для меня, потому что в моем случае родительский поток ответил на запрос обратно клиенту, и на объект с областью запроса нельзя ссылаться ни в каких рабочих потоках.

Я просто поработал, чтобы все вышеперечисленное работало. Я использую Spring Boot 2.2 и customTaskExecutor с ContextAwareCallable, указанным выше.

Асинхронная конфигурация:

@Bean(name = "cachedThreadPoolExecutor")
public Executor cachedThreadPoolExecutor() {

    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ContextAwarePoolExecutor();
    threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
    threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
    threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
    threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
    threadPoolTaskExecutor.setThreadNamePrefix("ThreadName-");
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;

}

ContextAwarePoolExecutor:

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {

   @Override
   public <T> Future<T> submit(Callable<T> task) {
      return super.submit(new ContextAwareCallable(task, RequestContextHolder.currentRequestAttributes()));
   }

   @Override
   public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
     return super.submitListenable(new ContextAwareCallable(task, 
     RequestContextHolder.currentRequestAttributes()));

   }

}

Создан настраиваемый вызываемый объект с учетом контекста:

 public class ContextAwareCallable<T> implements Callable<T> {
   private Callable<T> task;
   private CustomRequestScopeAttributes customRequestScopeAttributes;
   private static final String requestScopedBean = 
  "scopedTarget.requestScopeBeanName";

   public ContextAwareCallable(Callable<T> task, RequestAttributes context) {
    this.task = task;
    if (context != null) {
       //This is Custom class implements RequestAttributes class
        this.customRequestScopeAttributes = new CustomRequestScopeAttributes();

        //Add the request scoped bean to Custom class       
        customRequestScopeAttributes.setAttribute
        (requestScopedBean,context.getAttribute(requestScopedBean,0),0);
        //Set that in RequestContextHolder and set as Inheritable as true 
       //Inheritable is used for setting the attributes in diffrent ThreadLocal objects.
        RequestContextHolder.setRequestAttributes
           (customRequestScopeAttributes,true);
     }
 }

   @Override
   public T call() throws Exception {
   try {
      return task.call();
    } finally {
        customRequestScopeAttributes.removeAttribute(requestScopedBean,0);
    }
   }
}

Пользовательский класс:

public class CustomRequestScopeAttributes implements RequestAttributes { 

  private Map<String, Object> requestAttributeMap = new HashMap<>();
  @Override
  public Object getAttribute(String name, int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST) {
        return this.requestAttributeMap.get(name);
    }
    return null;
}
@Override
public void setAttribute(String name, Object value, int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST){
        this.requestAttributeMap.put(name, value);
    }
}
@Override
public void removeAttribute(String name, int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST) {
        this.requestAttributeMap.remove(name);
    }
}
@Override
public String[] getAttributeNames(int scope) {
    if(scope== RequestAttributes.SCOPE_REQUEST) {
        return this.requestAttributeMap.keySet().toArray(new String[0]);
    }
    return  new String[0];
 }
 //Override all methods in the RequestAttributes Interface.

}

Наконец, добавьте аннотацию Async в нужный метод.

  @Async("cachedThreadPoolExecutor")    
  public void asyncMethod() {     
     anyService.execute() //This Service execution uses request scoped bean
  }

С Spring-boot-2.0.3.REALEASE / spring-web-5.0.7 я придумал следующий код, работающий для @Async

Класс, содержащий контекст ThreadLocal.

import java.util.Map;

public class ThreadContextHolder {
  private ThreadContextHolder() {}

  private static final ThreadLocal<Map<String, Object>> ctx = new ThreadLocal<>();

  public static Map<String, Object> getContext() {
    return ctx.get();
  }

  public static void setContext(Map<String, Object> attrs) {
    ctx.set(attrs);
  }

  public static void removeContext() {
    ctx.remove();
  }
}

Асинхронная конфигурация:

      @Bean
      public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       ...
       ...

        executor.setTaskDecorator(
            runnable -> {
              RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); // or currentRequestAttributes() if you want to fall back to JSF context.
              Map<String, Object> map =
                  Arrays.stream(requestAttributes.getAttributeNames(0))
                      .collect(Collectors.toMap(r -> r, r -> requestAttributes.getAttribute(r, 0)));
              return () -> {
                try {
                  ThreadContextHolder.setContext(map);
                  runnable.run();
                } finally {
                  ThreadContextHolder.removeContext();
                }
              };
            });

        executor.initialize();
        return executor;
      }

И из метода async:

@Async
  public void asyncMethod() {
    logger.info("{}", ThreadContextHolder.getContext().get("key"));
  }

@Armadillo

  1. Сработало у меня, большое спасибо.

  2. Что касается Spring Security Context, есть более готовое решение, и оно сработало для меня (найдено здесь Как настроить стратегию Spring Security SecurityContextHolder?)

Чтобы использовать SecurityContextHolder в дочерних потоках:

@Bean
public MethodInvokingFactoryBean methodInvokingFactoryBean() {
    MethodInvokingFactoryBean methodInvokingFactoryBean = new MethodInvokingFactoryBean();
    methodInvokingFactoryBean.setTargetClass(SecurityContextHolder.class);
    methodInvokingFactoryBean.setTargetMethod("setStrategyName");
    methodInvokingFactoryBean.setArguments(new String[]{SecurityContextHolder.MODE_INHERITABLETHREADLOCAL});
    return methodInvokingFactoryBean;
}

Ответ @Armadillo побудил меня написать реализацию для Runnable.

Пользовательская реализация для TaskExecutor:

/**
 * This custom ThreadPoolExecutor stores scoped/context information with the tasks.
 */
public class ContextAwareThreadPoolExecutor extends ThreadPoolTaskExecutor {

     @Override
    public Future<?> submit(Runnable task) {
        return super.submit(new ContextAwareRunnable(task, RequestContextHolder.currentRequestAttributes()));
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(new ContextAwareRunnable(task, RequestContextHolder.currentRequestAttributes()));
    }
}

Пользовательская реализация для Runnable:

/**
 * This custom Runnable class can use to make background threads context aware.
 * It store and clear the context for the background threads.
 */
public class ContextAwareRunnable implements Runnable {
    private Runnable task;
    private RequestAttributes context;

    public ContextAwareRunnable(Runnable task, RequestAttributes context) {
        this.task = task;
        // Keeps a reference to scoped/context information of parent thread.
        // So original parent thread should wait for the background threads. 
        // Otherwise you should clone context as @Arun A's answer
        this.context = context;
    }

    @Override
    public void run() {
        if (context != null) {
            RequestContextHolder.setRequestAttributes(context);
        }

        try {
            task.run();
        } finally {
            RequestContextHolder.resetRequestAttributes();
        }
    }
}

Я решил эту проблему, добавив следующую конфигурацию bean-компонента

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="request">
                <bean class="org.springframework.context.support.SimpleThreadScope"/>
            </entry>
        </map>
    </property>
</bean>

Обновление: указанное выше решение не очищает объекты, связанные с потоками, как указано в документации Spring. Эта альтернатива работает для меня: https://www.springbyexample.org/examples/custom-thread-scope-module.html

Вот связанный ответ для тех, кто хочет использовать RequestScope с неблокирующими командами ввода-вывода в API, в отличие от раскручивания дочерних потоков, которые живут после исходного HTTP-запроса.

ЦЕЛИ

Цели, к которым стремится большинство людей:

  • Избегайте блокировки потоков в API Java во время ввода-вывода для повышения эффективности.
  • Технически простой код

ПОВЕДЕНИЕ NODEJS / .NET

Довольно стандартно писать такой код в других стеках технологий API, чтобы потоки API могли возвращаться в пул потоков, ожидая завершения ввода-вывода. Мне потребовалось время, чтобы понять, как сделать то же самое на Java:

      // Start on thread 3
const data = await database.getInfo(sql);

// Then run this on thread 9
await downstreamService.postInfo(data);

// Finish on thread 7

Как NodeJS, так и .Net Core хранят объекты с областью запроса в объекте HttpRequest, чтобы данные могли перемещаться между потоками. Они не используют данные ThreadLocal, поскольку они явно не подходят для этого сценария.

ВЕСНА ASYNC ОЖИДАНИЕ ЗАПРОСА ОБЪЕМ

В Spring можно реализовать настраиваемую область, которая хранит объекты с областью запроса в текущем объекте HttpServletRequest, чтобы к объектам можно было получить доступ до и после операторов await:

ИСПОЛЬЗОВАНИЕ И ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ

Надеюсь, приведенные ниже детали будут понятны читателям. Одна из целей моего блога состояла в том, чтобы последовательно реализовать этот тип нефункционального шаблона кодирования в разных стеках технологий, и я не хотел использовать Java WebFlux:

Другие вопросы по тегам