Как выполнить окончательный оператор print один раз в многошаговой программе уменьшения карты?
Я в основном пытаюсь внедрить систему рекомендаций, расширяя ее на Hadoop.
На первом этапе я пытаюсь вычислить сходство между каждой парой элементов во входном файле. Если я храню его просто как
{Пункт A, Пункт B, Сходство}
размер выходного файла становится очень очень большим (для ввода 60 КБ я получаю размер выходного файла 6 МБ).
Поэтому я подумал, будет ли лучше хранить результат в python dict и печатать dict ТОЛЬКО ОДИН РАЗ после окончания всей программы уменьшения карты. Мне не удается это сделать. Пожалуйста, помогите мне.
Мой код Python:
#!/usr/bin/env python
from mrjob.job import MRJob
from math import sqrt
from itertools import combinations
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0
prefs={}
def correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared):
'''
The correlation between two vectors A, B is
[n * dotProduct(A, B) - sum(A) * sum(B)] /
sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
'''
numerator = size * dot_product - rating_sum * rating2sum
denominator = sqrt(size * rating_norm_squared - rating_sum * rating_sum) * \
sqrt(size * rating2_norm_squared - rating2sum * rating2sum)
return (numerator / (float(denominator))) if denominator else 0.0
def regularized_correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared,
virtual_cont, prior_correlation):
'''
The Regularized Correlation between two vectors A, B
RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
where w = # actualPairs / (# actualPairs + # virtualPairs).
'''
unregularizedCorrelation = correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared)
w = size / float(size + virtual_cont)
return w * unregularizedCorrelation + (1.0 - w) * prior_correlation
class SemicolonValueProtocol(object):
# don't need to implement read() since we aren't using it
def write(self, key, values):
return ';'.join(str(v) for v in values)
class BooksSimilarities(MRJob):
#OUTPUT_PROTOCOL = SemicolonValueProtocol
def steps(self):
return [
self.mr(mapper=self.group_by_user_rating,
reducer=self.count_ratings_users_freq),
self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity),
self.mr(mapper=self.calculate_ranking,
reducer=self.top_similar_items)]
def group_by_user_rating(self, key, line):
'''
Emit the user_id and group by their ratings (item and rating)
17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2
'''
line=line.replace("\"","");
user_id, item_id, rating = line.split(',')
yield user_id, (item_id, float(rating))
def count_ratings_users_freq(self, user_id, values):
'''
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
'''
item_count = 0
item_sum = 0
final = []
for item_id, rating in values:
item_count += 1
item_sum += rating
final.append((item_id, rating))
yield user_id, (item_count, item_sum, final)
def pairwise_items(self, user_id, values):
'''
The output drops the user from the key entirely, instead it emits
the pair of items as the key:
19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2
'''
item_count, item_sum, ratings = values
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), (item1[1], item2[1])
def calculate_similarity(self, pair_key, lines):
'''
Sum components of each corating pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
corating counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.
19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y in lines:
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
sum_xx += item_x * item_x
sum_yy += item_y * item_y
n += 1
reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy, PRIOR_COUNT, PRIOR_CORRELATION)
yield (item_xname, item_yname), (reg_corr_sim, n)
def calculate_ranking(self, item_keys, values):
'''
Emit items with similarity in key for ranking:
19,0.4 70,1
19,0.6 21,2
21,0.6 19,2
21,0.9 70,1
70,0.4 19,1
70,0.9 21,1
'''
reg_corr_sim, n = values
item_x, item_y = item_keys
if int(n) > 0:
yield (item_x, reg_corr_sim),(item_y, n)
def top_similar_items(self, key_sim, similar_ns):
'''
For each item emit K closest items in comma separated file:
De La Soul;A Tribe Called Quest;0.6;1
De La Soul;2Pac;0.4;2
'''
item_x, reg_corr_sim = key_sim
for item_y, n in similar_ns:
#yield None, (item_x, item_y, reg_corr_sim, n)
prefs.setdefault(item_x,{})
prefs[item_x][item_y] = float(reg_corr_sim)
prefs.setdefault(item_y,{})
prefs[item_y][item_x] = float(reg_corr_sim)
print "exiting"
if __name__ == '__main__':
BooksSimilarities.run()
Итак, что я хочу после выполнения
python thisfile.py
это сравнительно небольшой выходной файл без повторов и единого слова.
Короче,
В настоящее время эта программа печатает выход из n раз, но я хочу, чтобы он печатал только ОДИН РАЗ.
Помимо всего этого есть ли лучший способ реализовать совместную фильтрацию, улучшив масштабирование Hadoop.
Большое спасибо заранее.
1 ответ
У вас есть только гарантия, что значения с одним и тем же ключом перейдут к одному и тому же редуктору. Поэтому, если вы используете на своем кластере несколько редукторов, работа будет разделена, и у вас будет много "выходов", поскольку редукторы будут запускаться для выполнения задачи на всех ваших ключах.
Попробуйте запустить локально и проверить, если он работает:
python thisfile.py
Возможно, вы можете определить "reducer_final" в вашем steps(), чтобы получить все выходные данные редуктора последнего шага и управлять им так, как вы хотите.
Проверьте: http://pythonhosted.org/mrjob/job.html
С уважением,