Описание тега project-reactor

Reactor - это фундаментальная библиотека для приложений с быстрым реагированием на данные на JVM. Он предоставляет абстракции для Java, Groovy и других языков JVM, чтобы упростить создание приложений, управляемых событиями и данными. К тому же это очень быстро.
1 ответ

Как ограничить количество открытых сокетов в Spring-webflux WebClient?

У меня есть RESTful-сервис, и у меня была идея подготовить простой тест производительности с Reactor и Spring WebClient. Бенчмарк просто создает N пользователей, а затем за каждый созданный пользовательский пост M голосует. К сожалению, следующий ко…
0 ответов

Как сделать условный почтовый индекс с Reactor?

// there are three elements in profileSource // Profile(1, "Aris") // Profile(2, "Bob") // Profile(3, "Cathy") Flux.fromArray(new int[]{1, 3}) .zipWith(profileSource, /* a lambda to match id and profile */ ) .map( /* tuple.t1 is ID, tuple.t2 is Prof…
20 сен '18 в 10:11
4 ответа

Синтаксис для хранения результатов предыдущего отображения

Я хотел бы знать хороший способ использовать результаты предыдущего картирования в весеннем веб-потоке, например Mono.just(request) ... .flatMap(object0 -> createObject1(object0)) .flatMap(object1 -> createObject2(object1)) ... какой хороший с…
06 июн '18 в 15:00
2 ответа

Как преобразовать Reactor Flux<String> в InputStream

Учитывая, что у меня есть Flux&lt;String&gt; неизвестного размера, как я могу преобразовать его в InputStream что другая библиотека ожидает? Например, с помощью WebClient я могу добиться этого, используя этот подход WebClient.get('example.com').exch…
1 ответ

Моно асинхронная обработка исключений

Я просто пытаюсь понять, как работает обработка исключений в библиотеке реакторов. Рассмотрим следующий пример: public class FluxTest { @Test public void testIt() throws InterruptedException { Scheduler single = Schedulers.single(); CountDownLatch l…
08 июн '17 в 18:57
0 ответов

Координация задач и синхронизация в весеннем реакторе

Мы используем каркас пружинного реактора, и работа выполняется с точки зрения задач. Существует одна основная задача, и она создает несколько дочерних задач. Мне нужно сгенерировать какое-то событие, используя основную задачу, когда выполнение всех …
08 май '16 в 23:33
0 ответов

Является ли реактор-кролик потокобезопасным

Я планирую использовать клиент RabbitMQ из https://projectreactor.io/ в моем проекте Java. Но из документации ( https://projectreactor.io/docs/rabbitmq/milestone/reference) не ясно, что Отправитель или Получатель является поточно-ориентированным. Мо…
1 ответ

Поток потребления с уникальными значениями для каждого потребления

Я определил это как глобальный: Processor&lt;Integer, Integer&gt; p = RingBufferProcessor.create("test", 32); Stream&lt;List&lt;Integer&gt;&gt; s = Streams.wrap(p).distinct().buffer(5, TimeUnit.SECONDS).log().unbounded(); На строителя: s.consume(i -…
24 май '16 в 06:14
1 ответ

Реактор проекта: Как задержать выброс (газа) каждого элемента?

Рассмотрим следующее Flux Flux.range(1, 5) .parallel(10) .runOn(Schedulers.parallel()) .map(i -&gt; "https://www.google.com") .flatMap(uri -&gt; Mono.fromCallable(new HttpGetTask(httpClient, uri))) HttpGetTask это вызываемый объект, чья фактическая …
10 май '17 в 14:47
0 ответов

Получение ссылки на контекст в Spring Reactor

Я использую Spring Projectreactor реактор с активной зоной 3.1.8.RELEASE. Я внедряю каркас журналирования для моего микросервиса, чтобы иметь журналы аудита JSON, поэтому использовал контекст для хранения определенных полей, таких как идентификатор …
18 июн '18 в 21:41
1 ответ

Reactor Flux прокси для Socket.IO-клиента Java

Я реализую конечную точку Spring WebFlux, которая должна получать данные из Socket.IO-клиента Java. Я не понимаю, как собрать входящие данные в поток Flux. Могу ли я создать новый Flux как-нибудь и подписать его на эти входящие данные? Спасибо за со…
3 ответа

Spring webflux "Разрешено только одному соединению получать подписчика", если ответ сервера от switchIfEmpty

Я хотел бы поставить случай, когда, если объект существует, то отправить ошибку, если нет, то создать нового пользователя. вот мой обработчик: public Mono&lt;ServerResponse&gt; createUser(ServerRequest request) { Mono&lt;UserBO&gt; userBOMono = requ…
1 ответ

Reactor 3.x: есть ли оператор для throttleLatest (conflate)?

В rx-java 2.x есть оператор с именем throttleLatest, который объединяет входящие события на основе заданного времени: https://github.com/ReactiveX/RxJava/pull/5979 Есть ли подобный оператор в Reactor 3? Или возможно получить то же поведение, комбини…
1 ответ

Реактор StepVerifier.withVirtualTime блокируется на неопределенный срок

Я пытаюсь использовать функцию виртуального времени Reactor, но тест блокирует бесконечно (без тайм-аута) или выдает AssertionError (с таймаутом): @Test public void test() { StepVerifier.withVirtualTime(() -&gt; Flux.just(1, 2, 3, 4).delayElements(D…
24 авг '17 в 16:05
1 ответ

Как отменить подписку на элемент из потока

Рассмотрим следующий поток FluxSink&lt;String&gt; sink; Flux&lt;String&gt; flux1 = Flux .&lt;String&gt;create(emitter -&gt; { sink = emitter; },...) .cache() .publish() .autoConnect(); Таким образом, чтобы добавить / подписать элемент, мы можем сдел…
07 дек '18 в 12:47
1 ответ

Как манипулировать объектом, поступающим из Flux<Object>, со значением, исходящим от метода, излучающего Mono<Items> неблокирующим способом?

Я пытаюсь манипулировать моими объектами, полученными из Flux, с данными, полученными из Mono, где методы, излучающие Flux объекта и Mono элементов, являются разными вызовами API. Проблема в том, что я не контролирую потоки, а элементы, полученные и…
0 ответов

Реактор: Потокобезопасный способ распространения локальных переменных потока с использованием декорированного планировщика или subscriberContext?

В настоящее время я использую Reactor в своем проекте. Для многопоточности с Reactor, есть ли лучший способ распространения локальных переменных потока? В настоящее время у нас есть один подход, когда мы используем SubscriberContext и Schedulers. Др…
05 дек '18 в 16:08
1 ответ

Реактор проекта, составляющий Flux.zip()

Я пытался изучить Project Reactor 3.0 с помощью этого небольшого приложения. Я пытаюсь составить функцию Flux.zip() для объединения переменных в объект Movie. В Reactor кажется, что возвращаемый тип Flux&lt;Tuple5&lt;&gt;&gt;, В RxJava2 он возвращае…
1 ответ

Рекурсия по реактивным потокам с Project Reactor

Моя цель - просмотреть граф каталогов и записать все их имена, используя реактивные потоки и Project Reactor. Поскольку файловая система удаленная, вызовы к ней блокируются. Поэтому я хотел бы сохранить выполнение блокирующего вызова отдельно от ост…
1 ответ

Планировщик Spring Webflux (Reactor) не завершает работу после ошибок

Я новичок в Spring Webflux/Reactive Programming в Spring, и у меня есть некоторые проблемы с задачей планировщика: Планировщик (поток исполнителя) не завершает работу после возникновения ошибки, например, при попытке сохранить объект в базе данных, …