Pyspark Sql Боковой вид взорваться со схемой
Я пытаюсь использовать боковой вид взрыва на иск SQL-запрос с использованием pyspark. Чтобы выбрать значения из столбца 'init' из моего ввода, это выглядит ниже.
{
sequence_number={"n":"3"},
page_id={"s":"ab313c97-cb92-476a-b011-bd7267248018"},
init={
"s":"[{\"type\":4,\"id\":\"LD_TOKEN\",\"name\":\"LDID\",\"fieldvisibility\":{\"boxWidth\":0,\"boxHeight\":0,\"parentsDisplayed\":true,\"childrenDisplayed\":false},\"element_id\":0},{\"type\":3,\"id\":\"F-ZIP\",\"name\":\"ZIP\",\"fieldvisibility\":{\"boxWidth\":165,\"boxHeight\":46,\"parentsDisplayed\":true,\"childrenDisplayed\":true},\"element_id\":1}]"
},
client_time={"n":"1504629342081"},
created={"n":"1504629378.2508"},
http_X-Forwarded-For={"s":"66.249.92.154, 10.3.5.223"},
http_User-Agent={"s":"Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1 (compatible; AdsBot-Google-Mobile; +http://www.google.com/mobile/adsbot.html)"},
http_Content-Length={"n":"919"},
token={"s":"680796B7-4F38-EAB2-945A-13DC0D634587"}, execution_time={"n":"3"}
}
Мои преобразования pyspark выглядят как показано ниже с использованием SQL-запроса.
formtbl = src_df.createOrReplaceTempView("formdata")
sel_data = spark.sql("SELECT GET_JSON_OBJECT(item['token'], '$.s') as token "
"FROM formdata "
"LATERAL VIEW OUTER EXPLODE(FROM_JSON(GET_JSON_OBJECT(item['init'], '$.s'), 'array<StructType<type:string>>')) initTable AS initjson "
"where get_json_object(item['init'], '$.s') is not null"
)
Приведенный выше запрос выдает мне ошибку в моем определении схемы from_json, поскольку простое разбиение столбца 'init' завершится неудачей, так как его строка на входе и explode не принимает строку в качестве входных данных, ей нужна карта или массив, поэтому я пытаюсь разобрать в использовании from_json, тот же запрос отлично работает на HIVEQL
---------------------------------------------------------------------
------
Py4JJavaError Traceback (most recent call
last)
~/.pyenv/versions/3.6.1/lib/python3.6/site-
packages/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
~/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
318 "An error occurred while calling {0}{1}
{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
Py4JJavaError: An error occurred while calling o22.sql.
: org.apache.spark.sql.AnalysisException:
extraneous input '<' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL',
'DISTINCT', ....IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 5)
== SQL ==
array<StructType<type:string>>
-----^^^
; line 1 pos 95
atorg.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7.apply(FunctionRegistry.scala:494)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7.apply(FunctionRegistry.scala:473)
at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:91)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1181)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1183)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1183)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7.applyOrElse(Analyzer.scala:1182)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7.applyOrElse(Analyzer.scala:1170)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1170)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1168)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:59)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1168)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1167)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent
call last)
<ipython-input-75-9ba3d7cd3f63> in <module>()
1 formtbl = src_df.createOrReplaceTempView("formdata")
2
----> 3 sel_data = spark.sql("SELECT GET_JSON_OBJECT(item['token'],
'$.s') as token "
4 "FROM formdata "
5 "LATERAL VIEW OUTER
EXPLODE(FROM_JSON(GET_JSON_OBJECT(item['init'], '$.s'), '
'array<StructType<type:string>>')) initTable AS initjson "
~/.pyenv/versions/3.6.1/lib/python3.6/site-
packages/pyspark/sql/session.py in sql(self, sqlQuery)
554 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3,
f2=u'row3')]
555 """
--> 556 return DataFrame(self._jsparkSession.sql(sqlQuery),
self._wrapped)
557
558 @since(2.0)
~/.pyenv/versions/3.6.1/lib/python3.6/site-
packages/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id,
self.name)
1134
1135 for temp_arg in temp_args:
~/.pyenv/versions/3.6.1/lib/python3.6/site-
packages/pyspark/sql/utils.py in deco(*a, **kw)
67
e.java_exception.getStackTrace()))
68 if
s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1],
stackTrace)
70 if
s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1],
stackTrace)
AnalysisException: "\nextraneous input '<' expecting {'SELECT',
(line 1, pos 5)\n\n== SQL ==\narray<StructType<type:string>>\n-----
^^^\n; line 1 pos 95"
Что не так в моем синтаксисе? Может кто-нибудь, пожалуйста, помогите мне в этом.