multiprocessing.Manager() выдает в Python

Мои извинения за дамп кода ниже, но я решил, что ошибаюсь из-за слишком большого контекста, а не из-за слишком малого.

Я пытаюсь написать асинхронный декоратор, который использует multiprocessing.Manager() для разделяемой памяти. В качестве тестового примера я дважды вызываю функцию, которая добавляет 1 к переменной экземпляра manager.Namespace(). После обоих вызовов я проверяю значение этой переменной.

80% времени я вижу то, что ожидал: значение 2. Примерно в 20% случаев значение переменной составляет всего 1, и я не понимаю, что может быть причиной этой проблемы. Есть идеи?

from multiprocessing import Queue, Process, Manager
from collections import defaultdict

def queue_function(fn, args, kwargs):
  async.q.put([fn(*args, **kwargs), id(fn)])

def start_process(fn, args, kwargs):
  p = Process(target=queue_function, args=(fn, args, kwargs))
  p.start()

  async.processes.append(p)

def cleanup():
  # ensure no processes remain in a zombie state
  while async.processes:
    p = async.processes.pop()
    p.join()

def merge_dicts(d1, d2):
  for key in ['args', 'kwargs']:
    d1[key] += d2.get(key, [])
  return d1

class async:
  tree = {}
  q = Queue()
  processes = []
  map = {}
  manager = Manager()
  fn_map = {}

  def __init__(self, callback=False, dependencies=set()):
    self.callback = callback
    self.dependencies = dependencies

  def __call__(self, fn):
    """Returns decorated function"""
    def async_fn(*args, **kwargs):
      fn_call = {'args': [args], 'kwargs': [kwargs]}
      async.tree[fn] = merge_dicts(fn_call, async.tree.get(fn, {}))

    # functions cannot be added to queue
    # work around this by passing an id inst
    async.fn_map[id(fn)] = fn

    #mapping from decorated function to undecorated function
    async.map[async_fn] = fn
    return async_fn

  @classmethod
  def begin(self):
    # applies fn(*args) for each obj in object, ensuring
    # that the proper attributes of shared_data exist before calling a method

    # because some functions depend on the results of other functions, this is 
    # a semi-synchronous operation -- certain methods must be guaranteed to
    # terminate before others 

    # aliasing
    tree, q, processes = async.tree, async.q, async.processes
    fn_map = async.fn_map

    # start a new process for each object that has no dependencies
    for fn, v in tree.items():
      for i in range(len(v['args'])):
        args, kwargs = v['args'].pop(), v['kwargs'].pop()
        start_process(fn, args, kwargs)

    # read from queue as items are added
    i = 0
    while i < len(processes):

      # update note with new data
      result, fn_id = async.q.get()
      fn = fn_map[fn_id]
      i += 1

    cleanup()

if __name__ == '__main__':

  @async()
  def add(x):
    shared.sum += x

    return shared.sum

  d = defaultdict(int)

  for i in range(100):
    #shared data
    shared = async.manager.Namespace()
    shared.sum = 0
    add(1)
    add(1)
    async.begin()
    d[shared.sum] += 1

  print d

0 ответов

Другие вопросы по тегам