"Мостовой" поток реактора от gRPC StreamObserver

Я хочу создать Reactor Flux из gRPC StreamObserver. Это необходимо сделать до тех пор, пока StreamObserver не реализует соответствующие интерфейсы изначально (см., Например, эту проблему).

То, что я придумал, примерно так:

final StreamObserver<ProtoResponse>[] streamObserverArray = new  StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
        @Override
        public void onNext(ProtoResponse value) {
            final Response response = convertFromProto(value);
            sink.next(response);
        }

        @Override
        public void onError(Throwable throwable) {
            sink.error(throwable);
        }

        @Override
        public void onCompleted() {
            sink.complete();
        }
    });
myFlux
    .doOnError(throwable -> {/* actual logic in here */}) //
    .doOnComplete(() -> {/* actual logic in here */}) //
    .doOnCancel(() -> {/* actual logic in here */}) //
    .parallel() //
    .runOn(Schedulers.parallel()) //
    .doOnNext(/* actual heavy lifting logic in here */) //
    .map(/* ... */) //
    .sequential() //
    .doOnNext(/* ...*/) //
    .subscribe(); // needed to start the actual processing of the events on this Flux

MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);

Основная идея, почему я хочу использовать Reactor, заключается в том, чтобы распределить "тяжелую работу" по нескольким потокам параллельно, а не делать это в потоках запросов gRPC.

Я вижу несколько проблем с подходом, как это сделано выше:

  • Мне действительно не нравится обходной путь с StreamObserver[] массив
  • Мне нужно сначала создать полный поток, потому что, если я не закончу его с .subscribe() во-первых, StreamObserver возможно null когда gRPC начинает общаться (так называемое состояние гонки).
  • Я не уверен, что обратное давление работает так, как задумано (хотя в настоящее время это не моя главная задача).

Поэтому у меня возникнут следующие вопросы: каков наилучший / предпочтительный способ подключения gRPC StreamObserver к потоку Reactor? Есть ли лучшие практики?

1 ответ

Решение

После того, как я немного покопался и немного лучше понял весь реактивный материал, я нашел следующее решение:

/**
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world.
*/
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> {

    private Subscriber<? super Long> subscriber;

    @Override
    public void onNext(Long l) {
        subscriber.onNext(l);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onCompleted() {
        subscriber.onComplete();
    }

    @Override
    public void subscribe(Subscriber<? super Long> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new BaseSubscriber() {});
    }
}

// and somewhere else in the code
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Long> longFlux = Flux.from(streamObserverPublisher);
longFlux.subscribe(...); // must be done before executing the gRPC request
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);

В настоящее время есть более простое решение:

https://github.com/salesforce/reactive-grpc.

Он поддерживает соединение gRPC как с Reactor, так и с RxJava 2.

Другие вопросы по тегам