Примеры использования планировщиков rxJava
В RxJava есть 5 разных планировщиков на выбор:
немедленный (): создает и возвращает планировщик, который выполняет работу немедленно в текущем потоке.
trampoline (): создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.
newThread (): создает и возвращает планировщик, который создает новый поток для каждой единицы работы.
computation (): создает и возвращает планировщик, предназначенный для вычислительной работы. Это может быть использовано для обработки событий, обработки обратных вызовов и другой вычислительной работы. Не выполняйте связанную с IO работу с этим планировщиком. Используйте планировщики. IO () вместо.
io (): создает и возвращает планировщик, предназначенный для работы, связанной с вводом-выводом. Реализация поддерживается пулом потоков Executor, который будет расти по мере необходимости. Это может быть использовано для асинхронного выполнения блокировки ввода-вывода. Не выполняйте вычислительную работу с этим планировщиком. Используйте планировщики. вместо вычисления ().
Вопросы:
Первые 3 планировщика говорят сами за себя; Тем не менее, я немного запутался в вычислениях.
- Что такое "работа, связанная с IO"? Используется ли для работы с потоками (
java.io
) и файлы (java.nio.files
)? Используется ли он для запросов к базе данных? Используется ли он для загрузки файлов или доступа к API REST? - Чем вычисление () отличается от newThread ()? Все ли вызовы computation () каждый раз выполняются в одном (фоновом) потоке, а не в новом (фоновом)?
- Почему плохо вызывать computation () при выполнении операций ввода-вывода?
- Почему плохо вызывать io () при выполнении вычислительной работы?
2 ответа
Отличные вопросы, я думаю, что документация могла бы быть более подробной.
io()
поддерживается неограниченным пулом потоков и это то, что вы бы использовали для задач, не требующих большого объема вычислений, то есть то, что не сильно нагружает процессор. Таким образом, да, взаимодействие с файловой системой, взаимодействие с базами данных или службами на другом хосте являются хорошими примерами.computation()
поддерживается ограниченным пулом потоков с размером, равным количеству доступных процессоров. Если вы пытались планировать интенсивную работу процессора параллельно более чем на доступных процессорах (скажем, с помощьюnewThread()
) тогда вы готовы к накладным расходам на создание потоков и переключению контекста, так как потоки соперничают за процессор, и это потенциально может сильно снизить производительность.- Лучше всего уйти
computation()
только для интенсивной работы процессора, иначе вы не получите хорошую загрузку процессора. - Плохо звонить
io()
для вычислительной работы по причине, обсуждаемой в 2.io()
неограничен, и если вы планируете тысячу вычислительных задач наio()
параллельно каждая из этих тысяч задач будет иметь свой собственный поток и будет конкурировать за процессор, что влечет за собой затраты на переключение контекста.
Наиболее важным моментом является то, что и Schedulers.io, и Schedulers.computation поддерживаются неограниченными пулами потоков, в отличие от других, упомянутых в вопросе. Эта характеристика является общей для Schedulers.from(Executor) только в том случае, если Исполнитель создан с помощью newCachedThreadPool (без ограничений с пулом потоков с автоматическим возвратом).
Как обильно объяснено в предыдущих ответах и многочисленных статьях в Интернете, Schedulers.io и Schedulers.computation должны использоваться осторожно, поскольку они оптимизированы для типа работы от их имени. Но, на мой взгляд, они играют важнейшую роль в обеспечении реального параллелизма реактивным потокам.
Вопреки убеждению новичков, реактивные потоки не являются изначально параллельными, но по своей сути асинхронными и последовательными. По этой самой причине Schedulers.io должен использоваться только тогда, когда операция ввода-вывода блокируется (например, с помощью команды блокировки, такой как Apache IOUtils FileUtils.readFileAsString (...)), таким образом, будет зависать вызывающий поток до тех пор, пока операция не будет завершена. сделанный.
Использование асинхронного метода, такого как Java AsynchronousFileChannel (...) не будет блокировать вызывающий поток во время операции, поэтому нет смысла использовать отдельный поток. На самом деле потоки Schedulers.io не очень подходят для асинхронных операций, так как они не запускают цикл обработки событий и обратный вызов никогда... не будет вызван.
Та же логика применяется для доступа к базе данных или удаленных вызовов API. Не используйте Schedulers.io, если вы можете использовать асинхронный или реактивный API для выполнения вызова.
Вернуться к параллелизму. У вас может не быть доступа к асинхронному или реактивному API для асинхронного или одновременного выполнения операций ввода-вывода, поэтому единственной альтернативой является отправка нескольких вызовов в отдельном потоке. Увы, реактивные потоки являются последовательными на своих концах, но хорошая новость заключается в том, что оператор flatMap() может вводить параллелизм в своей основе.
Параллельность должна быть встроена в потоковую конструкцию, обычно используя оператор flatMap(). Этот мощный оператор может быть настроен для внутреннего предоставления многопоточного контекста для вашей встроенной функции flatMap()
Более подробную информацию можно найти в статьях о планировщиках и параллелизме RxJava2, где вы найдете пример кода и подробные объяснения о том, как использовать планировщики последовательно и одновременно.
Надеюсь это поможет,
Softjake
Из сообщения в блоге:
Schedulers.io () поддерживается неограниченным пулом потоков. Он используется для операций ввода-вывода без интенсивной загрузки ЦП, включая взаимодействие с файловой системой, выполнение сетевых вызовов, взаимодействие с базой данных и т. Д. Этот пул потоков предназначен для асинхронного блокирующего ввода-вывода.
Schedulers.computation () поддерживается ограниченным пулом потоков размером до числа доступных процессоров. Он используется для вычислительной или ресурсоемкой работы, такой как изменение размера изображений, обработка больших наборов данных и т. Д. Будьте осторожны: когда вы выделяете больше вычислительных потоков, чем доступных ядер, производительность снижается из-за переключения контекста и накладных расходов на создание потоков, поскольку потоки соперничают за время процессоров.
Schedulers.newThread () создает новый поток для каждой запланированной единицы работы. Этот планировщик дорог, так как каждый раз создается новый поток, и повторное использование не происходит.
Schedulers.from (Executor executor) создает и возвращает собственный планировщик, поддерживаемый указанным исполнителем. Чтобы ограничить количество одновременных потоков в пуле потоков, используйте Scheduler.from(Executors.newFixedThreadPool(n)). Это гарантирует, что если задача запланирована, когда все потоки заняты, она будет поставлена в очередь. Потоки в пуле будут существовать до тех пор, пока он не будет явно отключен.
Основной поток или AndroidSchedulers.mainThread() предоставляется библиотекой расширений RxAndroid для RxJava. Основной поток (также известный как поток пользовательского интерфейса) - это место, где происходит взаимодействие с пользователем. Следует позаботиться о том, чтобы не перегружать этот поток, чтобы не допустить дергания неотвечающего пользовательского интерфейса или, что еще хуже, диалога "Приложение не отвечает" (ANR).
Schedulers.single () является новым в RxJava 2. Этот планировщик поддерживается одним потоком, выполняющим задачи последовательно в запрошенном порядке.
Schedulers.trampoline () выполняет задачи в порядке FIFO (First In, First Out) одним из участвующих рабочих потоков. Он часто используется при реализации рекурсии, чтобы избежать увеличения стека вызовов.