Spark - объединение структурированного потока со случайно обновленными наборами данных из файла
У меня есть один поток, в котором мне нужно выполнить агрегацию с помощью оконного окна времени события, водяного знака, а также хочу обогатить его некоторыми другими наборами данных посредством объединения. Эти обогащающие наборы данных взяты из файлов и случайно обновляются / добавляются (когда говорят "случайно", я имел в виду, что они не обновляются / обновляются периодически).
Допустим, у моего основного потока A есть поля (timestamp, item_id, value). Для краткости, скажем, у меня есть 1 набор данных ref: B, который имеет поля (timestamp, item_id, item_name). В A для каждого элемента у меня периодически появляются новые данные каждый час. Тем не менее, B содержит административные данные, поэтому иногда обновляется два раза в день, но иногда один раз в неделю. Требуется выполнить агрегацию для A.value, как только новые данные поступят в A, и обогатить их с помощью B.item_name.
Я попробовал несколько подходов, но все не удалось:
Подход 1. Поддерживать B как набор данных отображения с использованием Stateful MapGroupWithState <<< это невозможно, так как мне нужен "обновленный" режим при записи данных, и мне также нужно выполнить агрегацию перед выполнением MapGroupWithState (я еще не выяснил, как это сделать agg after MapGroupWithState)
Подход 2: Получить последние данные из B на основе временных меток, что-то вроде:
val B_latest = B.filter($"d_time" = B.agg(max("d_time")).take(1)(0).get(0))
A.join(B_latest, Seq("item_id"), "inner")
Но с этим я получил AnalysisException: запросы с потоковыми источниками должны выполняться с помощью writeStream.start (), что, я думаю, вызвано этой функцией agg_max.
Любое решение для меня по этому поводу?
Большое спасибо.