Python: Как я могу индексировать в MapReduce(MRJob)?
Я хочу проиндексировать результат редуктора следующим образом:
1 "EZmocAborM6z66rTzeZxzQ"
2 "FIk4lQQu1eTe2EpzQ4xhBA"
3 "myql3o3x22_ygECb8gVo7A"
4 "ojovtd9c8GIeDiB8e0mq2w"
5 "uVEoZmmL9yK0NMgadLL0CQ"
мой Python
MRJob
код:
class MRUserDic(MRJob):
count = 1
def mapper(self, _, line):
line = json.loads(line)
yield line['user_id'], 1
def reducer(self, key, values):
yield self.count, key
self.count += 1
if __name__ == '__main__':
MRUserDic.run()
Но этот результат в:
1 "EZmocAborM6z66rTzeZxzQ"
2 "FIk4lQQu1eTe2EpzQ4xhBA"
3 "myql3o3x22_ygECb8gVo7A"
1 "ojovtd9c8GIeDiB8e0mq2w"
2 "uVEoZmmL9yK0NMgadLL0CQ"
Я знаю, что это происходит потому, что редукторы работают на разных машинах.
Есть ли способ разделить переменную счета среди редукторов?
1 ответ
Чтобы отсортировать выходные данные редуктора, вам нужно будет загрузить результаты в память, что можно сделать с помощью бегуна. Сохраните ваш код в своем собственном.py файле (MRUserDic.py) и внедрите средство запуска для сортировки вывода редуктора:
from MRUserDic import MRUserDic
reducer_output = []
mr_job = MRUserDic(args=['input_file.txt'])
with mr_job.make_runner() as runner:
runner.run()
for line in runner.stream_output():
reducer_output.append(line)
sorted_output = sorted(reducer_output)
Просто замените input_file.txt на местоположение вашего входного файла.