Spark - объединение структурированного потока со случайно обновленными наборами данных из файла

У меня есть один поток, в котором мне нужно выполнить агрегацию с помощью оконного окна времени события, водяного знака, а также хочу обогатить его некоторыми другими наборами данных посредством объединения. Эти обогащающие наборы данных взяты из файлов и случайно обновляются / добавляются (когда говорят "случайно", я имел в виду, что они не обновляются / обновляются периодически).

Допустим, у моего основного потока A есть поля (timestamp, item_id, value). Для краткости, скажем, у меня есть 1 набор данных ref: B, который имеет поля (timestamp, item_id, item_name). В A для каждого элемента у меня периодически появляются новые данные каждый час. Тем не менее, B содержит административные данные, поэтому иногда обновляется два раза в день, но иногда один раз в неделю. Требуется выполнить агрегацию для A.value, как только новые данные поступят в A, и обогатить их с помощью B.item_name.

Я попробовал несколько подходов, но все не удалось:

Подход 1. Поддерживать B как набор данных отображения с использованием Stateful MapGroupWithState <<< это невозможно, так как мне нужен "обновленный" режим при записи данных, и мне также нужно выполнить агрегацию перед выполнением MapGroupWithState (я еще не выяснил, как это сделать agg after MapGroupWithState)

Подход 2: Получить последние данные из B на основе временных меток, что-то вроде:

val B_latest = B.filter($"d_time" = B.agg(max("d_time")).take(1)(0).get(0))
A.join(B_latest, Seq("item_id"), "inner")

Но с этим я получил AnalysisException: запросы с потоковыми источниками должны выполняться с помощью writeStream.start (), что, я думаю, вызвано этой функцией agg_max.

Любое решение для меня по этому поводу?

Большое спасибо.

0 ответов

Другие вопросы по тегам