'sqlContext' 在我的函数调用另一个函数时未定义
'sqlContext' is not defined when my function calls another function
我有一个函数 all_purch_spark() 可以为五个不同的 table 设置 Spark 上下文和 SQL 上下文。然后,相同的函数成功地 运行 对 AWS Redshift 数据库进行了 sql 查询。它很好用。我在下面包含了整个函数(当然删除了敏感数据)。请原谅它的长度,但鉴于我面临的问题,我想展示它。
我的问题是第二个函数 repurch_prep() 以及它如何调用第一个函数 all_purch_spark()。我不知道如何避免这样的错误:NameError: name 'sqlContext' is not defined
我将在下面展示这两个函数和错误。
这是第一个函数 all_purch_spark()。我再次将整个功能放在这里以供参考。我知道它很长,但不确定是否可以将其简化为一个有意义的示例。
def all_purch_spark():
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch
这里是调用上面函数的第二个函数。它应该 select 反对在上面的函数中设置的注册 table 视图:
def repurch_prep():
all_purch_spark()
all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
(SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
ORDER BY p_paid_user_id ASC")
return all_repurch
当我 运行 repurch_prep() 它抛出以下异常,即使在上面的函数中定义了 SQL 上下文。我试过上面的返回值,但不知道如何让它工作:
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
in
----> 1 repurch_prep()
~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
735 #sc = SparkContext().getOrCreate()
736 #sqlContext = SQLContext()
--> 737 all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
738 (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
739 ORDER BY p_paid_user_id ASC")
NameError: name 'sqlContext' is not defined
非常感谢任何帮助。
@Lamanus 的解决方案是将变量放在函数之外,使它们成为全局变量,而不是将它们存储在一个函数中(就像我所做的那样),然后从另一个函数中调用该函数。
############### SPARK REDSHIFT GLOBAL CONFIG #####################
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
###############################################################
def all_purch_spark():
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch
我有一个函数 all_purch_spark() 可以为五个不同的 table 设置 Spark 上下文和 SQL 上下文。然后,相同的函数成功地 运行 对 AWS Redshift 数据库进行了 sql 查询。它很好用。我在下面包含了整个函数(当然删除了敏感数据)。请原谅它的长度,但鉴于我面临的问题,我想展示它。
我的问题是第二个函数 repurch_prep() 以及它如何调用第一个函数 all_purch_spark()。我不知道如何避免这样的错误:NameError: name 'sqlContext' is not defined
我将在下面展示这两个函数和错误。
这是第一个函数 all_purch_spark()。我再次将整个功能放在这里以供参考。我知道它很长,但不确定是否可以将其简化为一个有意义的示例。
def all_purch_spark():
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch
这里是调用上面函数的第二个函数。它应该 select 反对在上面的函数中设置的注册 table 视图:
def repurch_prep():
all_purch_spark()
all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
(SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
ORDER BY p_paid_user_id ASC")
return all_repurch
当我 运行 repurch_prep() 它抛出以下异常,即使在上面的函数中定义了 SQL 上下文。我试过上面的返回值,但不知道如何让它工作:
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
in
----> 1 repurch_prep()
~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
735 #sc = SparkContext().getOrCreate()
736 #sqlContext = SQLContext()
--> 737 all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
738 (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
739 ORDER BY p_paid_user_id ASC")
NameError: name 'sqlContext' is not defined
非常感谢任何帮助。
@Lamanus 的解决方案是将变量放在函数之外,使它们成为全局变量,而不是将它们存储在一个函数中(就像我所做的那样),然后从另一个函数中调用该函数。
############### SPARK REDSHIFT GLOBAL CONFIG #####################
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
###############################################################
def all_purch_spark():
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch