Ошибка потока DLT. Запросы с источниками потоковой передачи должны выполняться с помощью writeStream.start();

Я пытаюсь проанализировать входящие записи потоков переменной длины в блоках данных с помощью таблиц Delta Live Tables. Я получаю сообщение об ошибке:

Запросы с потоковыми источниками должны выполняться с помощью writeStream.start();

Код ноутбука

      @dlt.table (
    comment="xAudit Parsed"
)
def b_table_parsed():
    df = dlt.readStream("dlt_table_raw_view")
          
    for i in range(df.select(F.max(F.size('split_col'))).collect()[0][0]):
        df = df.withColumn("col"+str(i),df["split_col"][i])
    
    df = (df
          .drop("value","split_col")
         )
      
    return df

Все это отлично работает с фактическими исходными текстовыми файлами или дельта-таблицей с использованием интерактивного кластера, но когда я помещаю его в DLT и источник передает файлы из автозагрузчика, ему это не нравится. Я предполагаю, что это связано с потоком.

Я видел другой пост об использовании .foreach, возможно, но он использовал writeStream и не уверен, могу ли я или как его использовать для возврата в таблицу DLT, или есть ли другое решение.

Я очень новичок в python, потоковой передаче и DLT, поэтому был бы признателен, если бы кто-нибудь мог рассказать мне подробное решение.

Попытка проанализировать строки переменной длины в источнике потоковой передачи с помощью записной книжки с дельта-таблицами в блоках данных. Работает в интерактивном кластере, но не работает в DLT.

1 ответ

Проблема в этом фрагменте кода:df.select(F.max(F.size('split_col'))).collect()[0][0]- вы пытаетесь найти максимум + собрать его из потока, который по определению не имеет начала и конца. Ваш код, скорее всего, работает с пакетным DF или внутри функции, вызываемой из.foreachBatchэто не поддерживается DLT.

Другие вопросы по тегам