С запросом Pushdown в spark, как получить параллелизм в spark-HBASE (BIGSQL как движок SQL)?

В Spark PushdownQuery обрабатывается SQL Engine БД, и в результате получается построенный массив данных. Итак, искра запросов к результатам этого запроса.

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""

val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()

Я могу видеть из другой ссылки ( https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-queries-10x) в spark - mysql, параллелизм в запросе pushdown достигается путем запуска нескольких запрос основан на аргументах numPartitions & partitionColumn. Это очень похоже на подход к распределению sqoop. скажем для приведенного выше примера аргумента аргумента numPartitions = 4; partitionColumn = COUNTRY_CODE и в нашей таблице диапазон значений COUNTRY_CODE падает на (000,999).

4 запроса строятся; запускается в БД, и из этих результатов строится датафрейм (с параллелизмом 4 в этом случае).

Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE  <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE  <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE  <= 999

Вопрос, который у меня сейчас возникает, заключается в том, как можно достичь параллелизма с помощью этого метода в spark (версия 2.1) + hbase (механизм запросов - BIGSQL)? это не дает мне параллелизма прямо сейчас. Нужно ли обновлять драйверы для этого моста в spark-hbase? или искра должна сделать это? или какие изменения помогают ему это получить? какое-то направление в этом мне помогает. Спасибо!

1 ответ

Решение

Чтобы добиться максимальной производительности, я бы порекомендовал начать свою работу с помощью --num-executors 4 и --executor-cores 1, так как соединение jdbc является однопоточным, одна задача выполняется на одном ядре на запрос. Внося это изменение конфигурации, когда ваша работа выполняется, вы можете наблюдать за параллельными задачами, которые являются ядром каждого исполнителя.

Вместо этого используйте функцию ниже:

val connectionProperties: Properties = new Properties
connectionProperties.put("user", "xxxx")
connectionProperties.put("password", "xxxx")
connectionProperties.put("fetchsize", "10000") //fetches 10000 records at once per task
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
connectionProperties

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah ) tbl_alias"""

val dbDataFrame = spark.read.jdbc(url, pushdownQuery, "COUNTRY_CODE", 0L, 4L, 4, connectionProperties)

См. Https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String, LowerBound: Длинные, UpperBound: Длинные,numPartitions:Int,connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame

Другие вопросы по тегам