Как использовать подзапрос для опции dbtable в источнике данных jdbc?

Я хочу использовать Spark для обработки некоторых данных из источника JDBC. Но для начала, вместо чтения исходных таблиц из JDBC, я хочу выполнить некоторые запросы на стороне JDBC для фильтрации столбцов и объединения таблиц и загрузки результата запроса в виде таблицы в Spark SQL.

У меня работает следующий синтаксис для загрузки необработанной таблицы JDBC:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306",
    dbtable="mydb.table1",
    user="me",
    password="******",
    driver="com.mysql.jdbc.Driver" # mysql JDBC driver 5.1.41
).load() 
df_table1.show() # succeeded

Согласно документации Spark (я использую PySpark 1.6.3):

dbtable: таблица JDBC, которую следует прочитать. Обратите внимание, что все, что является допустимым в предложении FROM SQL-запроса, может быть использовано. Например, вместо полной таблицы вы также можете использовать подзапрос в скобках.

Просто для эксперимента я попробовал что-то простое, например:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306",
    dbtable="(SELECT * FROM mydb.table1) AS table1",
    user="me",
    password="******",
    driver="com.mysql.jdbc.Driver"
).load() # failed

Это бросило следующее исключение:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table1 WHERE 1=0' at line 1

Я также попробовал несколько других вариантов синтаксиса (добавить / удалить скобки, удалить предложение "как", переключить регистр и т. Д.) Без какой-либо удачи. Так какой будет правильный синтаксис? Где я могу найти более подробную документацию по синтаксису? Кроме того, откуда взялся этот странный "ГДЕ 1=0" в сообщении об ошибке? Спасибо!

3 ответа

Для чтения данных из источника JDBC с помощью SQL-запроса в Spark SQL вы можете попробовать что-то вроде этого:

val df_table1 = sqlContext.read.format("jdbc").options(Map(
    ("url" -> "jdbc:postgresql://localhost:5432/mydb"),
    ("dbtable" -> "(select * from table1) as table1"),
    ("user" -> "me"),
    ("password" -> "******"),
    ("driver" -> "org.postgresql.Driver"))
).load()

Я попробовал это с помощью PostgreSQL. Вы можете изменить его в соответствии с MySQL,

table = "(SELECT id, person, manager, CAST(tdate AS CHAR) AS tdate, CAST(start AS   CHAR) AS start, CAST(end AS CHAR) as end, CAST(duration AS CHAR) AS duration FROM EmployeeTimes) AS EmployeeTimes",

spark = get_spark_session()
df = spark.read.format("jdbc"). \
    options(url=ip,
            driver='com.mysql.jdbc.Driver',
            dbtable=table,
            user=username,
            password=password).load()
return df

У меня было много проблем с несовместимостью Spark JDBC с временными метками MYSQL. Хитрость заключается в том, чтобы преобразовать все ваши метки времени или значения продолжительности в строку до того, как JDBC коснется их. Просто приведите ваши значения в виде строк, и это будет работать.

Примечание. Вам также нужно будет использовать AS, чтобы дать запросу псевдоним, чтобы он работал.

С подключением Spark 2.2 на Python к MySQL (5.7.19) я могу запустить следующее при использовании table="(SELECT * FROM a_table) AS my_table",

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder.appName("myApp") \
    .config("jars", "/usr/local/spark-2.2.2-bin-hadoop2.7/jars/mysql-connector-java-5.1.45-bin.jar") \
    .getOrCreate()

my_df = my_spark.read.jdbc(
    url="jdbc:mysql://my_host:3306/my_db",
    table="(SELECT * FROM a_table) AS my_table",
    properties=['user': 'my_username', 'password': 'my_password'}
)

my_df.head(20)

Я думаю, что это может быть ошибка в Spark SQL.

Кажется, что или эта или эта строка дает вам ошибку. Оба используют интерполяцию строки Scala для замены table с dbtable,

s"SELECT * FROM $table WHERE 1=0"

Вот где вы можете найти table1 WHERE 1=0 из ошибки, с которой вы столкнулись, так как приведенный выше шаблон станет:

SELECT * FROM (select * from table1) as table1 WHERE 1=0

который выглядит неправильно.

Существует действительно MySQL-специфический диалект - MySQLDialect, который переопределяет getTableExistsQuery со своим:

override def getTableExistsQuery(table: String): String = {
  s"SELECT 1 FROM $table LIMIT 1"
}

поэтому я уверен, что источником ошибки является другой метод getSchemaQuery. Это маловероятно, если вы используете Spark 1.6.3, в то время как метод @Since("2.1.0") маркер.

Я настоятельно рекомендую проверить журналы базы данных MySQL и посмотреть, какой запрос выполняется, что приводит к сообщению об ошибке.

Другие вопросы по тегам