Отладка потоковой проги Hadoop

У меня есть данные в форме

  id,    movieid      , date,    time
 3710100, 13502, 2012-09-10, 12:39:38.000

Теперь в основном то, что я хочу сделать, это..

Я хочу узнать, сколько раз конкретный фильм смотрят с 7 до 11 утра с 30-минутным интервалом

Так что в основном..

Сколько раз фильм смотрели между

  6 and 6:30
  6:30 and 7
   7 and 7:30
   ...
   10:30-11

Поэтому я написал картограф и редуктор для достижения этой цели.

mapper.py

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    line = line.split(",")
    #print line

    print '%s\t%s' % (line[1], line)

reducer.py

#!/usr/bin/env python

import sys
import datetime
from collections import defaultdict



def convert_str_to_date(time_str):
    try:
        timestamp =   datetime.datetime.strptime(time_str, '%Y-%m-%d:%H:%M:%S.000')  #00:23:51.000


        return timestamp

    except Exception,inst:

        pass

def is_between(time, time1,time2):
    return True if time1 <= time < time2 else False


def increment_dict(data_dict, se10,date_time):
    start_time = datetime.datetime(date_time.year,date_time.month,date_time.day, 07,00,00)
    times = [start_time]
    for i in range(8):
        start_time += datetime.timedelta(minutes = 30 )
        times.append(start_time)
    for i in range(len(times) -1 ):
        if is_between(date_time, times[i], times[i+1]):
            data_dict[se10][i] += 1






keys = [0,1,2,3,4,5,6,7]



data_dict = defaultdict(dict)


# input comes from STDIN
def initialize_entry(se10):
    for key in keys:
        data_dict[se10][key] = 0

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()


    # parse the input we got from mapper.py

    se10, orig_data = line.split('\t')
    initialize_entry(se10)
    parse_line = orig_data.split(",")

    datestr = parse_line[2].replace(" ","").replace("'","")
    timestr = parse_line[3].replace(" ","").replace("'","")

    date_time = datestr + ":" + timestr

    time_stamp = convert_str_to_date(date_time)

    increment_dict(data_dict, se10,time_stamp)


for key, secondary_key in data_dict.items():
    for skey, freq in secondary_key.items():
        print key,"," ,skey,",",freq

Приведенный выше код работает очень хорошо, если я делаю

   cat input.txt | python mapper.py | sort | python reducer.py

Но когда я разверну его на кластерах. Нельзя сказать, что работа была убита.. и эта причина неизвестна.

Пожалуйста помоги.

Благодарю.

2 ответа

Обычно хорошей идеей является чтение журналов в JobHistory, как описано в /questions/29813302/zapusk-stsenariya-r-s-ispolzovaniem-potokovoj-peredachi-hadoop-sboj-zadaniya-pipemapredwaitoutputthreads-sboj-podprotsessa-s-kodom-1/29813314#29813314. Это должно дать вам больше деталей, почему работа не удалась.

Что касается окончаний строк, класс Hadoop Streaming, используемый по умолчанию для разделения строк, представляет собой TextInputFormat. Раньше он ломался с новой строки Windows, но с 2006 года он должен работать просто отлично.

Это оставляет ваши сценарии мапперов и редукторов вероятным источником проблем. В Python 3 используется нечто, называемое универсальными переводами строк, и он должен работать просто из коробки с переводами Unix и Windows. В Python 2.7 вам нужно явно включить его.

В Linux и Mac OS X вы можете снова открыть стандартный ввод с включенными универсальными символами новой строки. sys.stdin = open('/dev/stdin', 'U'), У меня под рукой нет компьютера с Windows, но на всех трех системах должно работать следующее:

import os
import sys

# reopen sys.stdin
os.fdopen(sys.stdin.fileno(), 'U')

for line in sys.stdin:
    …

Хорошо, я понял эту вещь..

Основная проблема заключалась в том, что моя рабочая локальная машина работает на основе Windows.. тогда как кластеры на основе Linux.

поэтому мне пришлось конвертировать файл, написанный в DOS, в формат Unix..

Другие вопросы по тегам