Не удается найти / получить доступ к сохраненным таблицам в потоке foreach

Я пытаюсь сохранить данные из данных в таблицу

def SaveData(row):

  ...

  # read csv string
  df = spark.read \
  .option("header", True) \
  .option("delimiter","|") \
  .option("quote", "\"") \
  .option("nullValue", "\\N") \
  .schema(schemaMapping) \
  .csv(csvData)

  df.write.format("delta").mode("append").save(tableLocation)
  #df.write.saveAsTable(tableName)
  #df.saveAsTable(tableName, format='parquet', mode='append')  

query = dfDEHubStream.writeStream.foreach(SaveData).start()

Мне удалось сделать несколько локальных тестов, и df.write.saveAsTable отлично подошел со статическими данными. Однако, когда я вхожу в потоковую передачу и пытаюсь сохранить данные во время foreach, по какой-то причине данные не отображаются на вкладке "Данные" блока данных.

Я знаю, что он где-то экономит, потому что, когда я удаляю опцию "добавить", через некоторое время он потерпит неудачу, говоря, что другая таблица с таким же именем уже существует в том же месте.

Я пытаюсь это понять!

  • Где это находится?
  • Как я могу найти это?
  • Почему он не сохраняется в таблице на вкладке "Данные", как статические данные?

Вот что я увидел:

"Таблицы, созданные с указанным МЕСТОПОЛОЖЕНИЕМ, считаются неуправляемыми в метастазах".

Итак, чтобы решить это, я должен сделать код ниже:

CREATE TABLE tableName
USING DELTA
LOCATION tableLocation

Эта функциональность может быть использована для "импорта" данных в метасторское хранилище.

Ну, это не сработало, как я ожидал. После некоторой обработки кирпичи данных дают мне исключение ниже:

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException:
org.apache.spark.sql.AnalysisException: You are trying to create an external
table `default`.`dbo_pedidos` from `/delta/dbo_pedidos` using Databricks
Delta, but the schema is not specified when the input path is empty.

Так что я действительно в замешательстве. Что мне здесь не хватает?

0 ответов

Другие вопросы по тегам