Pyspark Sql Боковой вид взорваться со схемой

Я пытаюсь использовать боковой вид взрыва на иск SQL-запрос с использованием pyspark. Чтобы выбрать значения из столбца 'init' из моего ввода, это выглядит ниже.

{
 sequence_number={"n":"3"}, 
 page_id={"s":"ab313c97-cb92-476a-b011-bd7267248018"}, 
 init={
       "s":"[{\"type\":4,\"id\":\"LD_TOKEN\",\"name\":\"LDID\",\"fieldvisibility\":{\"boxWidth\":0,\"boxHeight\":0,\"parentsDisplayed\":true,\"childrenDisplayed\":false},\"element_id\":0},{\"type\":3,\"id\":\"F-ZIP\",\"name\":\"ZIP\",\"fieldvisibility\":{\"boxWidth\":165,\"boxHeight\":46,\"parentsDisplayed\":true,\"childrenDisplayed\":true},\"element_id\":1}]"
      }, 
 client_time={"n":"1504629342081"}, 
 created={"n":"1504629378.2508"}, 
 http_X-Forwarded-For={"s":"66.249.92.154, 10.3.5.223"}, 
 http_User-Agent={"s":"Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1 (compatible; AdsBot-Google-Mobile; +http://www.google.com/mobile/adsbot.html)"}, 
 http_Content-Length={"n":"919"}, 
 token={"s":"680796B7-4F38-EAB2-945A-13DC0D634587"}, execution_time={"n":"3"}
 }

Мои преобразования pyspark выглядят как показано ниже с использованием SQL-запроса.

 formtbl = src_df.createOrReplaceTempView("formdata")
 sel_data = spark.sql("SELECT GET_JSON_OBJECT(item['token'], '$.s') as token " 
                 "FROM formdata "
                 "LATERAL VIEW OUTER EXPLODE(FROM_JSON(GET_JSON_OBJECT(item['init'], '$.s'), 'array<StructType<type:string>>')) initTable AS initjson "
                 "where get_json_object(item['init'], '$.s') is not null"
                 )

Приведенный выше запрос выдает мне ошибку в моем определении схемы from_json, поскольку простое разбиение столбца 'init' завершится неудачей, так как его строка на входе и explode не принимает строку в качестве входных данных, ей нужна карта или массив, поэтому я пытаюсь разобрать в использовании from_json, тот же запрос отлично работает на HIVEQL

---------------------------------------------------------------------
------
Py4JJavaError                             Traceback (most recent call 
last)
~/.pyenv/versions/3.6.1/lib/python3.6/site-
 packages/pyspark/sql/utils.py in deco(*a, **kw)
 62         try:
 ---> 63             return f(*a, **kw)
 64         except py4j.protocol.Py4JJavaError as e:

~/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/protocol.py 
in get_return_value(answer, gateway_client, target_id, name)
318                     "An error occurred while calling {0}{1}
{2}.\n".
--> 319                     format(target_id, ".", name), value)
320             else:

Py4JJavaError: An error occurred while calling o22.sql.
: org.apache.spark.sql.AnalysisException: 
extraneous input '<' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 
'DISTINCT', ....IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 5)

 == SQL ==
 array<StructType<type:string>>
 -----^^^
; line 1 pos 95
atorg.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7.apply(FunctionRegistry.scala:494)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7.apply(FunctionRegistry.scala:473)
at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:91)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1181)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1183)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1183)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7.applyOrElse(Analyzer.scala:1182)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7.applyOrElse(Analyzer.scala:1170)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1170)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1168)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1168)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1167)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

 AnalysisException                         Traceback (most recent 
 call last)
<ipython-input-75-9ba3d7cd3f63> in <module>()
  1 formtbl = src_df.createOrReplaceTempView("formdata")
  2 
  ----> 3 sel_data = spark.sql("SELECT GET_JSON_OBJECT(item['token'], 
 '$.s') as token " 
  4                      "FROM formdata "
  5                      "LATERAL VIEW OUTER 
 EXPLODE(FROM_JSON(GET_JSON_OBJECT(item['init'], '$.s'), ' 
 'array<StructType<type:string>>')) initTable AS initjson "

 ~/.pyenv/versions/3.6.1/lib/python3.6/site-
 packages/pyspark/sql/session.py in sql(self, sqlQuery)
554         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, 
 f2=u'row3')]
555         """
 --> 556         return DataFrame(self._jsparkSession.sql(sqlQuery), 
self._wrapped)
557 
558     @since(2.0)

 ~/.pyenv/versions/3.6.1/lib/python3.6/site-
packages/py4j/java_gateway.py in __call__(self, *args)
1131         answer = self.gateway_client.send_command(command)
1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, 
self.name)
1134 
1135         for temp_arg in temp_args:

~/.pyenv/versions/3.6.1/lib/python3.6/site-
 packages/pyspark/sql/utils.py in deco(*a, **kw)
 67                                              
 e.java_exception.getStackTrace()))
 68             if 
 s.startswith('org.apache.spark.sql.AnalysisException: '):
 ---> 69                 raise AnalysisException(s.split(': ', 1)[1], 
 stackTrace)
 70             if 
 s.startswith('org.apache.spark.sql.catalyst.analysis'):
 71                 raise AnalysisException(s.split(': ', 1)[1], 
 stackTrace)

 AnalysisException: "\nextraneous input '<' expecting {'SELECT',  
 (line 1, pos 5)\n\n== SQL ==\narray<StructType<type:string>>\n-----
 ^^^\n; line 1 pos 95"

Что не так в моем синтаксисе? Может кто-нибудь, пожалуйста, помогите мне в этом.

0 ответов

Другие вопросы по тегам