Как подключить блоки данных Azure к базе данных Cosmos DB с помощью API MongoDB?
Я создал одну учетную запись Azure CosmosDB, используя MongoDB API. Мне нужно подключить CosmosDB(API MongoDB) к кластеру Azure Databricks для чтения и записи данных из космоса.
Как подключить кластер Azure Databricks к учетной записи CosmosDB?
1 ответ
Вот фрагмент кода pyspark, который я использую для подключения к базе данных CosmosDB с помощью API MongoDB из блоков данных Azure (5.2 ML Beta (включает Apache Spark 2.4.0, Scala 2.11) и соединителя MongoDB: org.mongodb.spark:mongo-spark- соединитель_2.11:2.4.0):
from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("uri", CONNECTION_STRING) \
.load()
С CONNECTION_STRING, который выглядит так: "mongodb: // USERNAME: PASSWORD@testgp.documents.azure.com: 10255 /DATABASE_NAME.COLLECTION_NAME? Ssl=true&replicaSet=globaldb"
Я пробовал множество других вариантов (добавление имен баз данных и коллекций в качестве опции или конфигурации SparkSession), но безуспешно. Скажи мне, если это работает для тебя...
Вы пробовали разъем Spark? https://docs.databricks.com/spark/latest/data-sources/azure/cosmosdb-connector.html
Я использовал это для чтения / записи данных в прошлом без проблем - это должно работать независимо от API. Удаление данных является более сложным. Вам нужно использовать библиотеку Python.
После добавления org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
пакет, это сработало для меня:
import json
query = {
'$limit': 100,
}
query_config = {
'uri': 'myConnectionString'
'database': 'myDatabase',
'collection': 'myCollection',
'pipeline': json.dumps(query),
}
df = spark.read.format("com.mongodb.spark.sql") \
.options(**query_config) \
.load()
Я, однако, получаю эту ошибку с некоторыми коллекциями:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.6, executor 0): com.mongodb.MongoInternalException: The reply message length 10168676 is less than the maximum message length 4194304
Отвечая так же, как и на свой вопрос.
Используя MAVEN в качестве источника, я установил нужную библиотеку в свой кластер, используя путь
org.mongodb.spark:mongo-spark-connector_2.11:2.4.0
Искра 2.4
Пример кода, который я использовал, выглядит следующим образом (для тех, кто хочет попробовать):
# Read Configuration
readConfig = {
"URI": "<URI>",
"Database": "<database>",
"Collection": "<collection>",
"ReadingBatchSize" : "<batchSize>"
}
pipelineAccounts = "{'$sort' : {'account_contact': 1}}"
# Connect via azure-cosmosdb-spark to create Spark DataFrame
accountsTest = (spark.read.
format("com.mongodb.spark.sql").
options(**readConfig).
option("pipeline", pipelineAccounts).
load())
accountsTest.select("account_id").show()