Нечеткая логика в больших наборах данных, использующих Python
Моя команда застряла с запуском алгоритма нечеткой логики для двух больших наборов данных. Первый (подмножество) содержит около 180 тысяч строк, в которых содержатся имена, адреса и электронные письма людей, которым мы должны соответствовать во втором (расширенный набор). Суперсет содержит 2,5 млн записей. Оба имеют одинаковую структуру, и данные уже очищены, т.е. проанализированы адреса, нормализованы имена и т. Д.
- ContactID int,
- FullName varchar (150),
- Адрес varchar(100),
- Электронная почта varchar (100)
Цель состоит в том, чтобы сопоставить значения в строке подмножества с соответствующими значениями в надмножестве, чтобы выходные данные объединяли подмножество и надмножество и соответствующие проценты подобия для каждого поля (токена).
- ContactID,
- LookupContactID,
- ФИО,
- LookupFullName,
- FullName_Similarity,
- Адрес,
- LookupAddress,
- Address_Similarity,
- Эл. адрес,
- LookupEmail,
- Email_Similarity
Чтобы сначала упростить и протестировать код, мы объединили строки и знаем, что код работает с очень маленьким надмножеством; однако, как только мы увеличиваем количество записей, он застревает. Мы пробовали разные алгоритмы, Левенштейна, FuzzyWuzzy и т. Д. Безрезультатно. Проблема, на мой взгляд, в том, что Python делает это строка за строкой; Однако я не уверен. Мы даже пытались запустить его на нашем кластере Hadoop с использованием потоковой передачи; однако, это не дало никаких положительных результатов.
#!/usr/bin/env python
import sys
from fuzzywuzzy import fuzz
import datetime
import time
import Levenshtein
#init for comparison
with open('normalized_set_record_set.csv') as normalized_records_ALL_file:
# with open('delete_this/xab') as normalized_records_ALL_file:
normalized_records_ALL_dict = {}
for line in normalized_records_ALL_file:
key, value = line.strip('\n').split(':', 1)
normalized_records_ALL_dict[key] = value
# normalized_records_ALL_dict[contact_id] = concat_record
def score_it_bag(target_contact_id, target_str, ALL_records_dict):
'''
INPUT target_str, ALL_records_dict
OUTPUT sorted list by highest fuzzy match
'''
return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str))
for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1]
def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match
'''
# simply drop this index target_contact_id
df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x))
return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax()
def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
'''
best_score = 100
#score it
for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
if target_contact_id != comparison_contactid:
current_score = Levenshtein.distance(target_str, comparison_record_str)
if current_score < best_score:
best_score = current_score
best_match_id = comparison_contactid
best_match_str = comparison_record_str
return (best_match_str, best_score, best_match_id)
def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
'''
best_score = 0
#score it
for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
if target_contact_id != comparison_contactid:
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > best_score:
best_score = current_score
best_match_id = comparison_contactid
best_match_str = comparison_record_str
return (best_match_str, best_score, best_match_id)
def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
'''
score_threshold = 95
#score it
for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
if target_contact_id != comparison_contactid:
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > score_threshold:
return (comparison_record_str, current_score, comparison_contactid)
return (None, None, None)
def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match
'''
threshold_score = 80
top_matches_list = []
#score it
#iterate through dictionary
for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
if target_contact_id != comparison_contactid:
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > threshold_score:
top_matches_list.append((comparison_record_str, current_score, comparison_contactid))
if len(top_matches_list) > 0: return top_matches_list
def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match
'''
threshold_score = 80
#iterate through dictionary
for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
if target_contact_id != comparison_contactid:
#score it
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > threshold_score:
print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid))
pass
#stream in all contacts ie large set
for line in sys.stdin:
# ERROR DIAG TOOL
ts = time.time()
st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
print >> sys.stderr, line, st
contact_id, target_str = line.strip().split(':', 1)
score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict)
# output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict))
# output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict))
# print contact_id + ':' + str(output)
1 ответ
Ваш подход требует, чтобы вы сделали 180 000 * 2 500 000 = 450 000 000 000 сравнений.
450 миллиардов это много.
Чтобы уменьшить количество сравнений, вы можете сначала сгруппировать записи, которые имеют некоторые общие черты, такие как первые пять символов поля адреса или общий токен. Затем сравнивайте только записи, имеющие общую функцию. Эта идея называется "блокировкой" и обычно уменьшает количество общих сравнений, которые вы должны сделать, до чего-то управляемого.
Общая проблема, которую вы пытаетесь решить, называется " связывание записей". Поскольку вы используете python, вы можете посмотреть на библиотеку дедупликации, которая обеспечивает комплексный подход (я являюсь автором этой библиотеки).