Python: Как я могу индексировать в MapReduce(MRJob)?

Я хочу проиндексировать результат редуктора следующим образом:

1   "EZmocAborM6z66rTzeZxzQ"
2   "FIk4lQQu1eTe2EpzQ4xhBA"
3   "myql3o3x22_ygECb8gVo7A"
4   "ojovtd9c8GIeDiB8e0mq2w"
5   "uVEoZmmL9yK0NMgadLL0CQ"

мой PythonMRJob код:

class MRUserDic(MRJob):
    count = 1

    def mapper(self, _, line):
        line = json.loads(line)
        yield line['user_id'], 1

    def reducer(self, key, values):
        yield self.count, key
        self.count += 1

if __name__ == '__main__':
    MRUserDic.run()

Но этот результат в:

1   "EZmocAborM6z66rTzeZxzQ"
2   "FIk4lQQu1eTe2EpzQ4xhBA"
3   "myql3o3x22_ygECb8gVo7A"
1   "ojovtd9c8GIeDiB8e0mq2w"
2   "uVEoZmmL9yK0NMgadLL0CQ"

Я знаю, что это происходит потому, что редукторы работают на разных машинах.

Есть ли способ разделить переменную счета среди редукторов?

1 ответ

Чтобы отсортировать выходные данные редуктора, вам нужно будет загрузить результаты в память, что можно сделать с помощью бегуна. Сохраните ваш код в своем собственном.py файле (MRUserDic.py) и внедрите средство запуска для сортировки вывода редуктора:

from MRUserDic import MRUserDic

reducer_output = []
mr_job = MRUserDic(args=['input_file.txt'])
with mr_job.make_runner() as runner:
    runner.run()
    for line in runner.stream_output():
        reducer_output.append(line)

    sorted_output = sorted(reducer_output)

Просто замените input_file.txt на местоположение вашего входного файла.

Другие вопросы по тегам