Ошибка потока DLT. Запросы с источниками потоковой передачи должны выполняться с помощью writeStream.start();
Я пытаюсь проанализировать входящие записи потоков переменной длины в блоках данных с помощью таблиц Delta Live Tables. Я получаю сообщение об ошибке:
Запросы с потоковыми источниками должны выполняться с помощью writeStream.start();
Код ноутбука
@dlt.table (
comment="xAudit Parsed"
)
def b_table_parsed():
df = dlt.readStream("dlt_table_raw_view")
for i in range(df.select(F.max(F.size('split_col'))).collect()[0][0]):
df = df.withColumn("col"+str(i),df["split_col"][i])
df = (df
.drop("value","split_col")
)
return df
Все это отлично работает с фактическими исходными текстовыми файлами или дельта-таблицей с использованием интерактивного кластера, но когда я помещаю его в DLT и источник передает файлы из автозагрузчика, ему это не нравится. Я предполагаю, что это связано с потоком.
Я видел другой пост об использовании .foreach, возможно, но он использовал writeStream и не уверен, могу ли я или как его использовать для возврата в таблицу DLT, или есть ли другое решение.
Я очень новичок в python, потоковой передаче и DLT, поэтому был бы признателен, если бы кто-нибудь мог рассказать мне подробное решение.
Попытка проанализировать строки переменной длины в источнике потоковой передачи с помощью записной книжки с дельта-таблицами в блоках данных. Работает в интерактивном кластере, но не работает в DLT.
1 ответ
Проблема в этом фрагменте кода:df.select(F.max(F.size('split_col'))).collect()[0][0]
- вы пытаетесь найти максимум + собрать его из потока, который по определению не имеет начала и конца. Ваш код, скорее всего, работает с пакетным DF или внутри функции, вызываемой из.foreachBatch
это не поддерживается DLT.