Как запускать параллельные потоки в AWS Glue PySpark?
У меня есть искровая работа, которая просто извлекает данные из нескольких таблиц с одинаковыми преобразованиями. В основном цикл for, который выполняет итерацию по списку таблиц, запрашивает таблицу каталога, добавляет временную метку, а затем отправляет в Redshift (пример ниже).
На выполнение этой работы уйдет около 30 минут. Есть ли способ запустить их параллельно в одном контексте искры / клея? Я не хочу создавать отдельные работы с клеем, если я могу этого избежать.
import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *
# query the runtime arguments
args = getResolvedOptions(
sys.argv,
["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)
# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()
tables = []
for table in tables:
catalog_table = glueContext.create_dynamic_frame.from_catalog(
database="test", table_name=table, transformation_ctx=table
)
data_set = catalog_table.toDF().withColumn(
"batchLoadTimestamp", lit(job_execution_timestamp)
)
# covert back to glue dynamic frame
export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")
# remove null rows from dynamic frame
non_null_records = DropNullFields.apply(
frame=export_frame, transformation_ctx="non_null_records"
)
temp_dir = os.path.join(args["TempDir"], redshift_table_name)
stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=non_null_records,
catalog_connection=args["redshift_catalog_connection"],
connection_options={
"dbtable": f"{args['target_schema']}.{redshift_table_name}",
"database": args["target_database"],
"preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
},
redshift_tmp_dir=temp_dir,
transformation_ctx="stores_redshiftSink",
) ```
2 ответа
Вы можете сделать следующее, чтобы ускорить этот процесс
- Разрешить одновременное выполнение задания.
- Выделите достаточное количество DPU.
- Передайте список таблиц в качестве параметра
- Выполняйте задание параллельно, используя рабочие процессы Glue или пошаговые функции.
Теперь предположим, что у вас есть 100 таблиц для приема, вы можете разделить список на 10 таблиц в каждой и запустить задание одновременно 10 раз.
Поскольку ваши данные будут загружаться параллельно, время выполнения задания Glue будет уменьшено, что приведет к меньшим затратам.
Альтернативный подход, который будет намного быстрее, - это использовать прямую утилиту красного смещения.
- Создайте таблицу в красном смещении и оставьте столбец batchLoadTimestamp по умолчанию для current_timestamp.
- Теперь создайте команду копирования и загрузите данные в таблицу прямо из s3.
- Запустите команду копирования, используя задание оболочки Glue Python с использованием pg8000.
Почему этот подход будет быстрее?? Поскольку соединитель jdbc с красным смещением искры сначала выгружает фрейм данных искры в s3, а затем подготавливает команду копирования в таблицу красного смещения. И при запуске команды копирования напрямую вы удаляете накладные расходы на выполнение команды выгрузки, а также считываете данные в искру df.
В дополнение к вышеупомянутым подходам, чтение из таблиц JDBC в параллельном режиме значительно ускорит загрузку данных в DynamicFrames. https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html