Python запускает два цикла одновременно, где один ограничен по скорости и зависит от данных другого

У меня есть проблема в Python, где я хочу запустить два цикла одновременно. Я чувствую, что мне нужно сделать это, потому что второй цикл должен быть ограничен по скорости, но первый цикл действительно не должен быть ограничен по скорости. Кроме того, второй цикл принимает вход от первого.

Я ищу что-то, что работает примерно так:

for line in file:
do some stuff
list = []
list.append("an_item")

Rate limited:
for x in list:
do some stuff simultaneously

2 ответа

Решение

Существует два основных подхода с различными компромиссами: синхронное переключение между задачами и выполнение в потоках или подпроцессах. Во-первых, некоторые общие настройки:

from queue import Queue # or Queue, if python 2
work = Queue()

def fast_task():
    """ Do the fast thing """
    if done:
        return None
    else:
        return result

def slow_task(arg):
    """ Do the slow thing """

RATE_LIMIT = 30 # seconds

Теперь синхронный подход. Его преимущество в том, что он намного проще и проще в отладке, за счет того, что он немного медленнее. Насколько медленнее зависит от деталей ваших задач. Как это работает, мы запускаем узкий цикл, который каждый раз вызывает быструю работу, а медленную - только если прошло достаточно времени. Если быстрая работа больше не производит работу, и очередь пуста, мы уходим.

import time
last_call = 0

while True:
    next_job = fast_task()
    if next_job:
        work.put(next_job)
    elif work.empty():
        # nothing left to do
        break
    else:
        # fast task has done all its work - short sleep to slow the spin
        time.sleep(.1)

    now = time.time()
    if now - last_call > RATE_LIMIT:
        last_call = now
        slow_task(work.get())

Если вы чувствуете, что это не работает достаточно быстро, вы можете попробовать multiprocessing подход. Вы можете использовать ту же структуру для работы с потоками или процессами, в зависимости от того, импортируете ли вы из multiprocessing.dummy или же multiprocessing сам. Мы используем multiprocessing.Queue для общения вместо queue.Queue,

def do_the_fast_loop(work_queue):
    while True:
        next_job = fast_task()
        if next_job:
            work_queue.put(next_job)
        else:
            work_queue.put(None) # sentinel - tells slow process to quit
            break

def do_the_slow_loop(work_queue):
    next_call = time.time()
    while True:
        job = work_queue.get()
        if job is None: # sentinel seen - no more work to do
            break
        time.sleep(max(0, next_call - time.time()))
        next_call = time.time() + RATE_LIMIT
        slow_task(job)

if __name__ == '__main__':
    # from multiprocessing.dummy import Queue, Process # for threads
    from multiprocessing import Queue, Process # for processes
    work = Queue()
    fast = Process(target=fast_task, args=(work,))
    slow = Process(target=slow_task, args=(work,))
    fast.start()
    slow.start()
    fast.join()
    slow.join()

Как видите, вы можете реализовать гораздо больше машин, но это будет несколько быстрее. Опять же, насколько быстрее зависит многое от ваших задач. Я бы попробовал все три подхода - синхронный, многопоточный и многопроцессный - и увидел бы, какой вам больше нравится.

Вам нужно сделать 2 вещи:

  1. Поместите функцию, требующую данных от другого на свой собственный процесс
  2. Реализуйте способ связи между двумя процессами (например, очередь)

Все это должно быть сделано благодаря GIL.

Другие вопросы по тегам