Искровое скала эффективное парное сравнение с блокировкой
Учитывая данные потока кликов, содержащие cookieID, IP-адрес, географическую информацию, я пытаюсь вычислить сходство Jaccard (или любые другие меры сходства) ч / б всех возможных пар cookie, основанных на наборе IP-адресов, агрегированных на основе cookieID, и прогнозировать пары файлов cookie, чье сходство превышает пороговое значение, как файлы cookie от тех же лиц. Чуть более формально cookieID1 и cookieID2 принадлежат одному и тому же пользователю, если sim((cookieID1, [IP_1, IP_2, ..., IP_n1]), (cookieID2, [IP_1, IP_2, ..., IP_n2])) > 0,9, не иначе.
Однако существует несколько миллиардов cookieID, которые делают эту задачу очень сложной. Одним из возможных решений является группирование их в непересекающиеся блоки и рассмотрение схожих объектов, сгруппированных в блоки, и попарные сравнения выполняются только между совместно используемыми cookie-файлами за счет некоторых пропущенных совпадений. На данный момент я делю их на блоки в соответствии с почтовым индексом, предполагая, что одни и те же файлы cookie возникли в одном и том же месте.
Тем не менее, из-за масштаба нашего набора данных, это все еще занимает больше суток, что должно быть более эффективным. Был бы очень признателен, если бы вы могли дать мне несколько комментариев или поделиться своим предыдущим опытом. Вот мои коды, которые работают в Spark Scala.
val data = input.select("timestamp", "userID", "IP", "zipCode")
val aggData = data
.groupBy("userID", "zipCode")
.agg(max("timestamp").as("maxT"), min("timestamp").as("minT"), collect_set("IP").as("IPs"))
.withColumn("lifetime", 'maxT - 'minT)
val aggData2 = aggData
.groupBy("userID")
.agg(max(struct("lifetime", "zipCode")).alias("max"), union("IPs").as("IPs"))
.withColumn($"max.zipCode", "blockingKey")
val joinData = aggData2.as("a")
.join(aggData2.as("b"), $"a.blockingKey" === $"b.blockingKey" && $"a.userID" > $"b.userID")
.repartition($"zipCode")
.withColumn("sim", JaccardUtils.jaccardUdf($"a.IPs", $"b.IPs"))
.where($"sim" >= 0.75)
.select($"a.aamuuidIndex".as("idA"), $"b.aamuuidIndex".as("idB"))
Обратите внимание, что union() - это пользовательская функция, которая объединяет IP-наборы.