как распространить ServiceRequestContext на мой настроенный пул потоков
У меня есть сценарий, который обрабатывает запрос armeria и отправляет какое-то событие в
guava
с
EventBus
. проблема в том, что я теряю контекст при обработке события в обработчике EventBus. Я хочу знать, есть ли способ разрешить обработчику событий доступ
ServiceRequestContext
.
class EventListener {
@Subscribe
public void process(SomeCustomizedClass event) {
final ServiceRequestContext context = ServiceRequestContext.currentOrNull();
log.info("process ServiceRequestContext context={}", context);
}
}
зарегистрируйте обработчик событий.
EventBus eventBus = new AsyncEventBus(ThreadPoolTaskExecutor());
eventBus.register(new EventListener());
вот мой
Armeria
служба
@Slf4j
public class NameAuthRestApi {
final NameAuthService nameAuthService;
@Post("/auth")
@ProducesJson
public Mono<RealNameAuthResp> auth(RealNameAuthReq req) {
return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
.handle((result, sink) -> {
if (result.isSuccess()) {
// I post an event here, but the event process couldn't access the ServiceRequestContext
// that's would be the problem.
eventBus.post(new SomeCustomizedClass(result));
final RealNameAuthResp realNameAuthResp = new RealNameAuthResp();
realNameAuthResp.setTradeNo(result.getTradeNo());
realNameAuthResp.setSuccess(true);
sink.next(realNameAuthResp);
sink.complete();
} else {
sink.error(new SystemException(ErrorCode.API_ERROR, result.errors()));
}
});
}
}
3 ответа
Вам необходимо сделать:
public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
// Executed by an EventLoop 1.
// This thread has the ctx in its thread local.
return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
.handle((result, sink) -> {
// Executed by another EventLoop 2.
// But this doens't.
try (SafeCloseable ignord = ctx.push()) {
if (result.isSuccess()) {
...
} else {
...
}
}
});
}
Проблема в том, что метод handle выполняется другим потоком, у которого нет ctx в своем локальном потоке. Итак, вы должны вручную установить ctx.
Вы можете добиться того же эффекта, используя
xAsync
метод с
ctx.eventLoop()
:
public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
.handleAsync((result, sink) -> {
if (result.isSuccess()) {
...
} else {
...
}
}, ctx.eventLoop());
}
У нас есть два способа решить эту проблему: во-первых, используйте программу-исполнитель с ctx:
ctx.eventLoop().submit(new Task(new Event("eone")));
// If it's blocking task, then we must use ctx.blockingTaskExecutor().
Или распространите ctx вручную:
@Slf4j
public static class Task implements Runnable {
private final Event event;
private final ServiceRequestContext ctx;
Task(Event event) {
this.event = event;
ctx = ServiceRequestContext.current();
}
@Override
public void run() {
try (SafeCloseable ignored = ctx.push()) {
...
}
}
}
@minwoox, для упрощения мой код будет выглядеть так
public class NameAuthRestApi {
JobExecutor executor = new JobExecutor();
@Post("/code")
public HttpResponse authCode(ServiceRequestContext ctx) {
try (SafeCloseable ignore = ctx.push()) {
executor.submit(new Task(new Event("eone")));
}
return HttpResponse.of("OK");
}
@Getter
@AllArgsConstructor
public static class Event {
private String name;
}
@RequiredArgsConstructor
@Slf4j
public static class Task implements Runnable {
final Event event;
@Override
public void run() {
// couldn't access ServiceRequestContext here
ServiceRequestContext ctx = ServiceRequestContext.currentOrNull();
log.info("ctx={}, event={}", ctx, event);
}
}
public static class JobExecutor {
ExecutorService executorService = Executors.newFixedThreadPool(2);
public void submit(Task task) {
executorService.submit(task);
}
}
}