Диско цепочки пропуски уменьшить
Недавно я нашел Disco Project, и он мне очень нравится по сравнению с Hadoop, но у меня есть проблема. Мой проект настроен так (я буду рад вырезать / вставить настоящий код, если это поможет):
myfile.py
from disco.core import Job, result_iterator
import collections, sys
from disco.worker.classic.func import chain_reader
from disco.worker.classic.worker import Params
def helper1():
#do stuff
def helper2():
#do stuff
.
.
.
def helperN():
#do stuff
class A(Job):
@staticmethod
def map_reader(fd, params):
#Read input file
yield line
def map(self, line, params):
#Process lines into dictionary
#Iterate dictionary
yield k, v
def reduce(self, iter, out, params):
#iterate iter
#Process k, v into dictionary, aggregating values
#Process dictionry
#Iterate dictionary
out.add(k,v)
Class B(Job):
map_reader = staticmethod(chain_reader)
map = staticmethod(nop_map)
reduce(self, iter, out, params):
#Process iter
#iterate results
out.add(k,v)
if __name__ == '__main__':
from myfile import A, B
job1 = A().run(input=[input_filename], params=Params(k=k))
job2 = B().run(input=[job1.wait()], params=Params(k=k))
with open(output_filename, 'w') as fp:
for count, line in result_iterator(job2.wait(show=True)):
fp.write(str(count) + ',' + line + '\n')
Моя проблема в том, что поток работ полностью пропускает уменьшение А и понижается до уменьшения Б.
Есть идеи, что здесь происходит?
1 ответ
Это была легкая, но тонкая проблема: у меня не было
show = True
для работы1. По какой-то причине в show set для job2 он показывал мне шаги map() и map-shuffle() из job1, поэтому, поскольку я не получил ожидаемый конечный результат, и ввод в одну из функций job2 выглядит неправильно Я пришел к выводу, что шаги job1 не были выполнены должным образом (это было также поддержано, прежде чем я добавил job2, я проверил точность вывода job1).