Как файлы данных должны быть включены в mrjob на EMR?
Я пытаюсь запустить mrjob на EMR Amazon. Я тестировал работу локально, используя встроенный бегун, но он не работает при работе на Amazon. Я сузил ошибку до моей зависимости от внешнего файла данных zip_codes.txt
, Если я запускаю без этой зависимости, используя жестко закодированные данные почтового индекса, это работает просто отлично.
Я попытался включить необходимый файл данных, используя аргумент файла загрузки. Когда я смотрю на S3, файл туда попал, но явно что-то идет не так, что я не могу получить к нему доступ локально.
Вот мой mrjob.conf
файл:
runners:
emr:
aws_access_key_id: FOOBARBAZQUX
aws_secret_access_key: IAMASECRETKEY
aws_region: us-east-1
ec2_key_pair: mapreduce
ec2_key_pair_file: $ENV/keys/mapreduce.pem
ssh_tunnel_to_job_tracker: true
ssh_tunnel_is_open: true
cleanup_on_failure: ALL
cmdenv:
TZ: America/Los_Angeles
Это мое MR_zip.py
файл.
from mrjob.job import MRJob
import mrjob
import csv
def distance(p1, p2):
# d = ...
return d
class MR_zip(MRJob):
OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}
def mapper(self, _, line):
zip_code_1, poi = line.split(",")
zip_code_1 = int(zip_code_1)
lat1, lon1 = self.zip_codes[zip_code_1]
for zip_code_2, (lat2, lon2) in self.zip_codes.items():
d = distance((lat1, lon1), (lat2, lon2))
yield zip_code_2, (zip_code_1, poi, d)
def reducer(self, zip_code_1, ds):
result = {}
for zip_code_2, poi, d in ds:
if poi not in result:
result[poi] = (zip_code_2, d)
elif result[poi][1] > d:
result[poi] = (zip_code_2, d)
yield zip_code_1, result
if __name__ == '__main__':
MR_zip.run()
И наконец, я запускаю его с помощью следующей команды:
python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt
Где zip_codes.txt выглядит так:
...
62323,39.817702,-90.66923
62324,39.988988,-90.94976
62325,40.034398,-91.16278
62326,40.421857,-90.80333
...
И poi.txt выглядит так:
...
210,skate park
501,theatre
29001,theatre
8001,knitting club
20101,food bank
...
2 ответа
обзор
В моем коде было две ошибки:
- Код инициализации для шага должен быть в инициализаторе шага
- По умолчанию EMR использует Python 2.6, который, помимо прочего, не допускает словарного понимания
Шаг инициализации
Каждый шаг имеет соответствующий метод инициализатора. Например, mapper
имеет mapper_init
который может быть использован для инициализации данных, используемых в маппере. Функции reducer
а также combiner
имеют похожие методы инициализации. Если вы используете steps
функция, чтобы определить свои собственные шаги, то вы также можете определить, какую функцию инициализации вы используете. Подробнее об инициализаторах читайте здесь.
Остерегайтесь версии Python
На сегодняшний день EMR по умолчанию использует Python версии 2.6.6. Таким образом, любые зависимости от более поздних версий могут работать локально, но возникают проблемы с EMR.
Исправление
Чтобы исправить приведенный выше код, необходимо убрать строку, определяющую zip_codes
в MR_zip.py
zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}
и вместо этого определить его внутри mapper_init
без использования словарных пониманий.
def mapper_init(self):
self.zip_codes = {}
for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")):
self.zip_codes[int(zip_code)] = (float(latitude), float(longitude))
Другие файлы и командная строка остаются прежними.
Кроме того, вы можете найти полезное MRJob.add_file_option
рутина. Например, указав
self.add_file_option('--config-file', dest='config_file',
default=None, help='file with labels', action="append")
Вы можете ссылаться на загруженные файлы через self.options.config_file
список путей.