как распространить ServiceRequestContext на мой настроенный пул потоков

У меня есть сценарий, который обрабатывает запрос armeria и отправляет какое-то событие в guavaс EventBus. проблема в том, что я теряю контекст при обработке события в обработчике EventBus. Я хочу знать, есть ли способ разрешить обработчику событий доступ ServiceRequestContext.

class EventListener {
    @Subscribe
    public void process(SomeCustomizedClass event) {
        final ServiceRequestContext context = ServiceRequestContext.currentOrNull();
        log.info("process ServiceRequestContext context={}", context);
    }
}

зарегистрируйте обработчик событий.

EventBus eventBus = new AsyncEventBus(ThreadPoolTaskExecutor());
eventBus.register(new EventListener());

вот мой Armeria служба

@Slf4j
public class NameAuthRestApi {
    final NameAuthService nameAuthService;

    @Post("/auth")
    @ProducesJson
    public Mono<RealNameAuthResp> auth(RealNameAuthReq req) {
        return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                              .handle((result, sink) -> {
                                  if (result.isSuccess()) {
                                      // I post an event here, but the event process couldn't access the ServiceRequestContext
                                      // that's would be the problem.
                                      eventBus.post(new SomeCustomizedClass(result));

                                      final RealNameAuthResp realNameAuthResp = new RealNameAuthResp();
                                      realNameAuthResp.setTradeNo(result.getTradeNo());
                                      realNameAuthResp.setSuccess(true);
                                      sink.next(realNameAuthResp);
                                      sink.complete();
                                  } else {
                                      sink.error(new SystemException(ErrorCode.API_ERROR, result.errors()));
                                  }
                              });
    }
}

3 ответа

Вам необходимо сделать:

public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
    // Executed by an EventLoop 1.
    // This thread has the ctx in its thread local.
    return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                          .handle((result, sink) -> {
                              // Executed by another EventLoop 2.
                              // But this doens't.
                              try (SafeCloseable ignord = ctx.push()) {
                                  if (result.isSuccess()) {
                                      ...
                                  } else {
                                      ...
                                  }
                              }
                          });
}

Проблема в том, что метод handle выполняется другим потоком, у которого нет ctx в своем локальном потоке. Итак, вы должны вручную установить ctx.

Вы можете добиться того же эффекта, используя xAsync метод с ctx.eventLoop():

public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
    return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                          .handleAsync((result, sink) -> {
                              if (result.isSuccess()) {
                                  ...
                              } else {
                                  ...
                              }
                          }, ctx.eventLoop());
}

У нас есть два способа решить эту проблему: во-первых, используйте программу-исполнитель с ctx:

ctx.eventLoop().submit(new Task(new Event("eone")));
// If it's blocking task, then we must use ctx.blockingTaskExecutor().

Или распространите ctx вручную:


@Slf4j
public static class Task implements Runnable {
    private final Event event;
    private final ServiceRequestContext ctx;

    Task(Event event) {
        this.event = event;
        ctx = ServiceRequestContext.current();
    }

    @Override
    public void run() {
        try (SafeCloseable ignored = ctx.push()) {
            ...
        }
    }
}

@minwoox, для упрощения мой код будет выглядеть так

public class NameAuthRestApi {
    JobExecutor executor = new JobExecutor();

    @Post("/code")
    public HttpResponse authCode(ServiceRequestContext ctx) {
        try (SafeCloseable ignore = ctx.push()) {
            executor.submit(new Task(new Event("eone")));
        }
        return HttpResponse.of("OK");
    }

    @Getter
    @AllArgsConstructor
    public static class Event {
        private String name;
    }

    @RequiredArgsConstructor
    @Slf4j
    public static class Task implements Runnable {
        final Event event;

        @Override
        public void run() {
            // couldn't access ServiceRequestContext here
            ServiceRequestContext ctx = ServiceRequestContext.currentOrNull();
            log.info("ctx={}, event={}", ctx, event);
        }
    }

    public static class JobExecutor {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        public void submit(Task task) {
            executorService.submit(task);
        }
    }
}

Другие вопросы по тегам