"Мостовой" поток реактора от gRPC StreamObserver
Я хочу создать Reactor Flux из gRPC StreamObserver. Это необходимо сделать до тех пор, пока StreamObserver не реализует соответствующие интерфейсы изначально (см., Например, эту проблему).
То, что я придумал, примерно так:
final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
@Override
public void onNext(ProtoResponse value) {
final Response response = convertFromProto(value);
sink.next(response);
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onCompleted() {
sink.complete();
}
});
myFlux
.doOnError(throwable -> {/* actual logic in here */}) //
.doOnComplete(() -> {/* actual logic in here */}) //
.doOnCancel(() -> {/* actual logic in here */}) //
.parallel() //
.runOn(Schedulers.parallel()) //
.doOnNext(/* actual heavy lifting logic in here */) //
.map(/* ... */) //
.sequential() //
.doOnNext(/* ...*/) //
.subscribe(); // needed to start the actual processing of the events on this Flux
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
Основная идея, почему я хочу использовать Reactor, заключается в том, чтобы распределить "тяжелую работу" по нескольким потокам параллельно, а не делать это в потоках запросов gRPC.
Я вижу несколько проблем с подходом, как это сделано выше:
- Мне действительно не нравится обходной путь с
StreamObserver[]
массив - Мне нужно сначала создать полный поток, потому что, если я не закончу его с
.subscribe()
во-первых,StreamObserver
возможноnull
когда gRPC начинает общаться (так называемое состояние гонки). - Я не уверен, что обратное давление работает так, как задумано (хотя в настоящее время это не моя главная задача).
Поэтому у меня возникнут следующие вопросы: каков наилучший / предпочтительный способ подключения gRPC StreamObserver к потоку Reactor? Есть ли лучшие практики?
1 ответ
После того, как я немного покопался и немного лучше понял весь реактивный материал, я нашел следующее решение:
/**
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world.
*/
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> {
private Subscriber<? super Long> subscriber;
@Override
public void onNext(Long l) {
subscriber.onNext(l);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onCompleted() {
subscriber.onComplete();
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
this.subscriber = subscriber;
this.subscriber.onSubscribe(new BaseSubscriber() {});
}
}
// and somewhere else in the code
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Long> longFlux = Flux.from(streamObserverPublisher);
longFlux.subscribe(...); // must be done before executing the gRPC request
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);
В настоящее время есть более простое решение:
https://github.com/salesforce/reactive-grpc.
Он поддерживает соединение gRPC как с Reactor, так и с RxJava 2.