Многопроцессорность зависает после нескольких сотен заданий
Я пытаюсь использовать этот вопрос для обработки моего файла: многопроцессорная обработка Python для безопасной записи в файл
Это моя модификация кода:
def listener(q):
'''listens for messages on the q, writes to file. '''
while 1:
reads = q.get()
if reads == 'kill':
#f.write('killed')
break
for read in reads:
out_bam.write(read)
out_bam.flush()
out_bam.close()
def fetch_reads(line, q):
parts = line[:-1].split('\t')
print(parts)
start,end = int(parts[1])-1,int(parts[2])-1
in_bam = pysam.AlignmentFile(args.bam, mode='rb')
fetched = in_bam.fetch(parts[0], start, end)
reads = [read for read in fetched if (read.cigarstring and read.pos >= start and read.pos < end and 'S' not in read.cigarstring)]
in_bam.close()
q.put(reads)
return reads
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
if not args.threads:
threads = 1
else:
threads = int(args.threads)
pool = mp.Pool(threads+1)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
with open(args.bed,'r') as bed:
jobs = []
cnt = 0
for line in bed:
# Fire off the read fetchings
job = pool.apply_async(fetch_reads, (line, q))
jobs.append(job)
cnt += 1
if cnt > 10000:
break
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
print('get')
#now we are done, kill the listener
q.put('kill')
pool.close()
Разница в том, что я открываю и закрываю файл в функции, так как в противном случае я получаю необычные ошибки от bgzip.
Сначала print (parts) и print ('get') взаимозаменяемо печатаются (более или менее), затем все меньше и меньше распечаток 'get'. В конечном итоге код зависает, и ничего не печатается (все части печатаются, но "get" просто больше не печатается). Выходной файл остается нулевым байтом.
Кто-нибудь может протянуть руку? Ура!