我应该如何将 Spark SQL DataFrame 作为参数传递给 Python 函数?
How should I pass a Spark SQL DataFrame as an argument in Python function?
我在 Jupyter notebook 中有一个 Spark SQL DataFrame output_df1
。我想定义一个函数如下:
def output_agg(output_table_1):
output_agg_1 = spark.sql(f"""
select * from {output_table_1}
""")
return output_agg_1
当我调用 output_agg(output_df1)
时,出现以下错误:
Py4JJavaError Traceback (most recent call last)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o110.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '[' expecting <EOF>
你能帮我看看正确的语法吗?
在传递给 spark.sql
之前打印 SQL 查询并检查 SQL 查询是否正常。另外,分享有问题的 SQL 查询。
def output_agg(output_table_1):
query = f"""select * from {output_table_1}"""
print(query)
output_agg_1 = spark.sql(query)
return output_agg_1
如果 SQL 查询看起来不错,则可能的问题可能是 table 未在 spark 中注册。
正如评论中所讨论的,既然你想联合多个dfs,你可以这样做
from functools import reduce
from pyspark.sql import DataFrame
dfs_list = [output_df1, output_df2, output_df3, output_df4]
df_combined = reduce(DataFrame.unionAll, dfs_list)
注意:确保所有 df 中的列顺序相同
spark sql select
需要给 table/temporary table。首先将dataframe注册为临时table,然后执行SQL语句。
output_df1.createOrReplaceTempView('output_table')
def output_agg(output_table_1):
output_agg_1 = spark.sql(f"""
select * from {output_table_1}
""")
return output_agg_1
output_agg('output_table')
我无法对已完成的问题添加评论。学习SparkSQL的资料是SparkSQL参考。 https://spark.apache.org/docs/latest/sql-ref.html
我在 Jupyter notebook 中有一个 Spark SQL DataFrame output_df1
。我想定义一个函数如下:
def output_agg(output_table_1):
output_agg_1 = spark.sql(f"""
select * from {output_table_1}
""")
return output_agg_1
当我调用 output_agg(output_df1)
时,出现以下错误:
Py4JJavaError Traceback (most recent call last)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o110.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '[' expecting <EOF>
你能帮我看看正确的语法吗?
在传递给 spark.sql
之前打印 SQL 查询并检查 SQL 查询是否正常。另外,分享有问题的 SQL 查询。
def output_agg(output_table_1):
query = f"""select * from {output_table_1}"""
print(query)
output_agg_1 = spark.sql(query)
return output_agg_1
如果 SQL 查询看起来不错,则可能的问题可能是 table 未在 spark 中注册。
正如评论中所讨论的,既然你想联合多个dfs,你可以这样做
from functools import reduce
from pyspark.sql import DataFrame
dfs_list = [output_df1, output_df2, output_df3, output_df4]
df_combined = reduce(DataFrame.unionAll, dfs_list)
注意:确保所有 df 中的列顺序相同
spark sql select
需要给 table/temporary table。首先将dataframe注册为临时table,然后执行SQL语句。
output_df1.createOrReplaceTempView('output_table')
def output_agg(output_table_1):
output_agg_1 = spark.sql(f"""
select * from {output_table_1}
""")
return output_agg_1
output_agg('output_table')
我无法对已完成的问题添加评论。学习SparkSQL的资料是SparkSQL参考。 https://spark.apache.org/docs/latest/sql-ref.html