Описание тега monix

Monix is a library providing asynchronous programming facilities for Scala and Scala.js.
1 ответ

Как транслировать холодную наблюдаемую: повтор с обратным давлением?

Я на самом деле использую Scala, но этот вопрос является общим для всех Rx и потоковых фреймворков. Мой вариант использования состоит в том, что у меня есть сгенерированная наблюдаемая (поэтому холодная) и я хочу, чтобы несколько потребителей исполь…
1 ответ

Как обработать необработанное исключение бросить в monix onErrorHandle

Я использую задачи Monix, и я пытаюсь поймать Throwable, а затем преобразовать в пользовательскую ошибку. Я удалил / изменил код, чтобы он был простым и актуальным. Это код (вопрос следует за фрагментом кода): import io.netty.handler.codec.http.Http…
19 ноя '17 в 23:06
1 ответ

Как справиться с рекурсией с помощью наблюдаемой моникса?

Используя monix, я пытаюсь пройти по графику, построив Observable[Node] и используя широкий алгоритм. Однако там у меня есть небольшая проблема рекурсии. Вот фрагмент, иллюстрирующий мою проблему: package gp import monix.eval.Task import monix.execu…
25 дек '17 в 13:30
1 ответ

Понимание баланса потребительской нагрузки monix

Я учусь monix 3,Следующий код: object Main extends TaskApp { override def runc = { Observable.fromIterable(1 to 10) .map{i => val delay = Random.nextInt(1000) + 1000 println(s"Starting $i, delay = $delay") Thread.sleep(delay) // Imitation of hard…
17 авг '18 в 10:37
0 ответов

Задача Monix firstCompletedOf, которая также игнорирует ошибки

У меня есть случай использования, когда мне нужно сделать запрос к каждому серверу, однако меня интересует только первый успешный результат (и я хочу игнорировать любые возможные исключения). Я пытаюсь использовать Monix 2.x для выполнения этой зада…
23 июл '18 в 09:44
1 ответ

Эффективный способ создания абстрактной коллекции с кошками

У меня есть код, который использует Monix Observable для потоковой обработки файла. Чтобы проверить этот код, я хотел бы выполнить операции, которые я выполняю на Observable быть независимым от типа, поэтому я также могу выполнять их на любой другой…
05 дек '18 в 09:50
0 ответов

Наблюдаемая группа Monix по большому количеству ключей без утечек памяти

Я пытаюсь выполнить расщепление сингла Observable в Monix по ключу, затем группа до последнего n события в каждом GrouppedObservable и отправить их для дальнейшей обработки. Проблема в том, что количество ключей для группировки, возможно, бесконечно…
06 янв '19 в 13:16
0 ответов

Какой подход к планированию следует использовать для задач ввода-вывода, которые могут блокировать на неопределенный срок до определенной точки?

Я сомневаюсь, какую стратегию планирования использовать для блокировки задач ввода-вывода в приведенном ниже коде. Используя метод scheduleWithFixedDelayDispatchTask, кажется, что я буду создавать задачи одну за другой, накапливая задачи, которые мо…
18 янв '19 в 02:03
3 ответа

Как глобально упорядочить несколько упорядоченных наблюдаемых в Monix

Предположим, у меня есть несколько итераторов, которые упорядочены. Если бы я хотел объединить эти итераторы при глобальном их упорядочении (например, [(1,3,4), (2,4,5)] -> [1,2,3,4,4,5]) используя monix, как бы я это сделал?
18 июл '16 в 19:50
0 ответов

Моделирование нескольких вызовов функций с помощью потока (безопасным способом FP)

Учитывая функцию A => IO[B] (ака Kleisli[IO, A, B]) который должен вызываться несколько раз и имеет побочные эффекты, такие как обновление БД, как делегировать такие множественные вызовы в поток (я полагаю, Pipe[IO, A, B]) (fs2, monix наблюдаемый…
25 фев '19 в 10:10
1 ответ

Псевдоним типа и неявное разрешение значения

Я предполагаю, что это легко, но я столкнулся с проблемой, когда FlatMap или Functor задачи Monix не могут быть найдены. Я использую "псевдоним типа", чтобы упростить сигнатуру длинного типа, но затем в for-comprehession я получаю ошибку "Не удалось…
20 июл '18 в 06:55
2 ответа

Почему отображение класса A на класс B с помощью monix или akka-streams происходит так медленно?

Я сравнил отображение List[ClassA] с List[ClassB] с помощью потоков monix и akka, но я не понимаю, почему это так медленно. Я пробовал другой способ сопоставления, и вот результат с JMH: [info] Benchmark Mode Cnt Score Error Units [info] MappingBenc…
05 сен '17 в 10:29
1 ответ

Идиоматический способ обработки нескольких одновременных потоков в Scala

У меня есть список потоков, которые при вызове их next() будет спать случайное количество времени, а затем читать один символ из другого источника. Я пытаюсь написать потребителя (ей), которые будут продолжать вызывать эти потоки до EOF и создать об…
28 май '18 в 12:36
0 ответов

Как Задача вызывающей стороны избежать того, чтобы ее планировщик изменился подзадачей, над которой она перешла

Я ищу способ, которым Task (т. е. внешняя область) может выполнить "подзадачу", используя flatMap или что-то эквивалентное и убедитесь, что любые последующие вызовы в цепочке во внешней области используют оригинальный планировщик. Используемые библи…
22 июн '18 в 23:01
1 ответ

Akka Actor Поиск и трансляция событий

У меня есть сценарий, в котором у меня работает группа актеров Akka, каждый из которых представляет устройство IoT. У меня есть веб-приложение, основанное на Play, внутри которого эти Актеры работают и подключены к этим устройствам IoT. Теперь я хоч…
25 фев '18 в 07:11
1 ответ

Использование Monix Debounce Observable

Я пробую некоторые из операций, которые я мог бы сделать на Observable от Monix. Я наткнулся на этот оператор debounce и не мог понять его поведение: Observable.interval(5.seconds).debounce(2.seconds) Этот выше только излучает Long каждые 5 секунд. …
14 фев '18 в 21:19
1 ответ

Monix: InputStreamObservable не поддерживает нескольких подписчиков

Я пытаюсь разделить Observable of (String, Date) на две разные Observables и сжать их вместе следующим образом import monix.execution.Scheduler.Implicits.global val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator) va…
07 фев '18 в 06:04
1 ответ

Как построение вычислений для больших задач сравнивается с одновременным выполнением нескольких шагов?

У меня есть следующие две части кода, написанные на Scala/Monix: def f1(input) = for { a <- task1(input) b <- task2(a) c <- task3(b) } yield (c).runSyncUnsafe а также def f2(input) = { val a = task1(input).runSyncUnsafe val b = task2(a).run…
04 окт '18 в 05:47
2 ответа

Каков наилучший способ обернуть задачу моникса временем начала и окончания печати?

Это то, что я пытаюсь сейчас, но он печатает только "эй", а не метрики. Я не хочу добавлять метрическую информацию в основную функцию. import java.util.Date import monix.eval.Task import monix.execution.Scheduler.Implicits.global import scala.concur…
22 июн '18 в 09:20
2 ответа

Monix Coeval.memoize дует в стек

Определять def memoizeCoeval(n: Int): Coeval[Int] = { if (n <= 1) Coeval.now(1) else Coeval.defer(memoizeCoeval(n - 1)).map(_ + 1).memoize } Сейчас memoizeCoeval(10000).value взрывает стек Если мы удалим .memoize от рекурсивного вызова, это работ…
07 май '18 в 22:31