Live Tables Delta с EventHub
Я пытаюсь создать потоковую передачу из eventhub, используя дельта-таблицы live, но у меня возникают проблемы с установкой библиотеки. Можно ли установить библиотеку maven с использованием таблиц Delta Live с помощью sh/pip?
Я хочу установить com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17
2 ответа
В настоящее время невозможно использовать внешние коннекторы/библиотеки Java для таблиц Delta Live Tables. Но для EventHub есть обходной путь — вы можете подключиться к EventHub с помощью встроенного коннектора Kafka — вам просто нужно указать правильные параметры, как это описано в документации :
@dlt.table
def eventhubs():
readConnectionString="Endpoint=sb://<....>.windows.net/;?.."
eh_sasl = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{readConnectionString}";'
kafka_options = {
"kafka.bootstrap.servers": "<eh-ns-name>.servicebus.windows.net:9093",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"startingOffsets": "earliest",
"kafka.sasl.jaas.config": eh_sasl,
"subscribe": "<topic-name>",
}
return spark.readStream.format("kafka") \
.options(**kafka_options).load()
Настройка конвейера DLT с использованием концентратора событий Azure в качестве источника. Python для бронзовой таблицы, которая читает из концентратора событий. Sql для серебряных и золотых таблиц:
В Центре событий я отправляю следующий json: Убедитесь, что это не список.
{
"id": "2",
"name": "xyz1"
}
Обратитесь к документации Databricks, чтобы узнать, как настроить бронзовый слой в Python.
Блокнот Python
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = "xyz-eventhub1"
EH_NAME = "abc"
EH_CONN_SHARED_ACCESS_KEY_NAME = "RootManageSharedAccessKey"
# SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = "xyz="
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : "60000",
"kafka.session.timeout.ms" : "60000",
# "maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : "false",
"startingOffsets" : "earliest"
}
# PAYLOAD SCHEMA
payload_ddl = """id STRING, name STRING"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
print(df)
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
# .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("id", expr("parsed_records.id"))
.withColumn("name", expr("parsed_records.name"))
.withColumn("eh_enqueued_timestamp", expr("timestamp")) # when event was enqueued
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("bronze_timestamp", col("current_timestamp"))
.withColumn("bronze_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw events from kafka",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def kafka_bronze():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Блокнот SQL
Серебряный стол
CREATE STREAMING LIVE TABLE kafka_cleaned(
CONSTRAINT id_not_null EXPECT (id IS NOT NULL)
)
COMMENT "Cleaned kafka table"
TBLPROPERTIES ("companyPipeline.quality" = "silver")
AS
SELECT
cast(id as int) as id,
name,
eh_enqueued_timestamp,
bronze_timestamp,
CURRENT_TIMESTAMP as silver_timestamp
FROM STREAM(LIVE.kafka_bronze)
Золотой стол
CREATE STREAMING LIVE TABLE kafka_gold
COMMENT "count of id per name"
TBLPROPERTIES ("companyPipeline.quality" = "gold")
AS
SELECT count(id) as count_id,
name,
CURRENT_TIMESTAMP as gold_timestamp
FROM STREAM(LIVE.kafka_cleaned)
group by name
Используйте оба этих блокнота в качестве источника в конвейере DLT.