Не удается найти / получить доступ к сохраненным таблицам в потоке foreach
Я пытаюсь сохранить данные из данных в таблицу
def SaveData(row):
...
# read csv string
df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\\N") \
.schema(schemaMapping) \
.csv(csvData)
df.write.format("delta").mode("append").save(tableLocation)
#df.write.saveAsTable(tableName)
#df.saveAsTable(tableName, format='parquet', mode='append')
query = dfDEHubStream.writeStream.foreach(SaveData).start()
Мне удалось сделать несколько локальных тестов, и df.write.saveAsTable отлично подошел со статическими данными. Однако, когда я вхожу в потоковую передачу и пытаюсь сохранить данные во время foreach, по какой-то причине данные не отображаются на вкладке "Данные" блока данных.
Я знаю, что он где-то экономит, потому что, когда я удаляю опцию "добавить", через некоторое время он потерпит неудачу, говоря, что другая таблица с таким же именем уже существует в том же месте.
Я пытаюсь это понять!
- Где это находится?
- Как я могу найти это?
- Почему он не сохраняется в таблице на вкладке "Данные", как статические данные?
Вот что я увидел:
"Таблицы, созданные с указанным МЕСТОПОЛОЖЕНИЕМ, считаются неуправляемыми в метастазах".
Итак, чтобы решить это, я должен сделать код ниже:
CREATE TABLE tableName
USING DELTA
LOCATION tableLocation
Эта функциональность может быть использована для "импорта" данных в метасторское хранилище.
Ну, это не сработало, как я ожидал. После некоторой обработки кирпичи данных дают мне исключение ниже:
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException:
org.apache.spark.sql.AnalysisException: You are trying to create an external
table `default`.`dbo_pedidos` from `/delta/dbo_pedidos` using Databricks
Delta, but the schema is not specified when the input path is empty.
Так что я действительно в замешательстве. Что мне здесь не хватает?