'sqlContext' 在我的函数调用另一个函数时未定义

'sqlContext' is not defined when my function calls another function

我有一个函数 all_purch_spark() 可以为五个不同的 table 设置 Spark 上下文和 SQL 上下文。然后,相同的函数成功地 运行 对 AWS Redshift 数据库进行了 sql 查询。它很好用。我在下面包含了整个函数(当然删除了敏感数据)。请原谅它的长度,但鉴于我面临的问题,我想展示它。

我的问题是第二个函数 repurch_prep() 以及它如何调用第一个函数 all_purch_spark()。我不知道如何避免这样的错误:NameError: name 'sqlContext' is not defined

我将在下面展示这两个函数和错误。

这是第一个函数 all_purch_spark()。我再次将整个功能放在这里以供参考。我知道它很长,但不确定是否可以将其简化为一个有意义的示例。

def all_purch_spark():
    config = {
    'redshift_user': 'tester123',
    'redshift_pass': '*****************',
    'redshift_port': "5999",
    'redshift_db': 'my_database',
    'redshift_host': 'redshift.my_database.me',
    }

    from pyspark import SparkContext, SparkConf, SQLContext
    jars = [
    "/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
    ]
    conf = (
        SparkConf()
        .setAppName("S3 with Redshift")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("spark.hadoop.fs.s3a.path.style.access", True)
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("com.amazonaws.services.s3.enableV4", True)
        .set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
        .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
        .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
    )
    sc = SparkContext(conf=conf).getOrCreate()
    sqlContext = SQLContext(sc)

    ##Set Schema and table to query
    schema1 = 'production'
    schema2 = 'X4production'
    table1 = 'purchases'
    table2 = 'customers'
    table3 = 'memberships'
    table4 = 'users'  #set as users table in both schemas

    purchases_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table1}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    customers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table2}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    memberships_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table3}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    users_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    cusers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema2}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()
        
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('fc_purchases').getOrCreate()

    purchases_df.createOrReplaceTempView('purchases')
    customers_df.createOrReplaceTempView('customers')
    memberships_df.createOrReplaceTempView('memberships')
    users_df.createOrReplaceTempView('users')
    cusers_df.createOrReplaceTempView('cusers')

    all_purch = spark.sql("SELECT \
    p_paid.customer_id AS p_paid_user_id \
    ,p_trial.created_at AS trial_start_date \
    ,p_paid.created_at \
    ,cu.graduation_year \
    ,lower(cu.student_year) AS student_year \
    ,lower(p_paid.description) as product \
    ,u.email \
    ,u.id AS u_user_id \
    ,cu.id AS cu_user_id \
    FROM \
    purchases AS p_paid \
    INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
    INNER JOIN customers AS c on c.id = p_paid.customer_id \
    INNER JOIN memberships AS m on m.id = c.membership_id \
    INNER JOIN users AS u on u.id = m.user_id \
    INNER JOIN cusers AS cu on cu.id = u.id \
    WHERE \
    p_trial.created_at >= '2018-03-01' \
    AND p_paid.created_at >= '2018-03-01' \
    AND u.institution_contract = false \
    AND LOWER(u.email) not like '%hotmail.me%' \
    AND LOWER(u.email) not like '%gmail.com%' \
    AND p_trial.description like '% Day Free Trial' \
    AND p_paid.status = 'paid' \
    GROUP BY \
    p_paid_user_id \
    ,trial_start_date \
    ,p_paid.created_at \
    ,u.email \
    ,cu.graduation_year \
    ,student_year \
    ,product \
    ,cu_user_id \
    ,u_user_id \
    ORDER BY p_paid_user_id") 
    all_purch.registerTempTable("all_purch_table")
   
    return all_purch    

这里是调用上面函数的第二个函数。它应该 select 反对在上面的函数中设置的注册 table 视图:

    def repurch_prep():
        all_purch_spark()
        all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
        (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
        ORDER BY p_paid_user_id ASC")
        return all_repurch

当我 运行 repurch_prep() 它抛出以下异常,即使在上面的函数中定义了 SQL 上下文。我试过上面的返回值,但不知道如何让它工作:

---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
 in 
----> 1 repurch_prep()

~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
    735     #sc = SparkContext().getOrCreate()
    736     #sqlContext = SQLContext()
--> 737     all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
    738     (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
    739     ORDER BY p_paid_user_id ASC")

NameError: name 'sqlContext' is not defined

非常感谢任何帮助。

@Lamanus 的解决方案是将变量放在函数之外,使它们成为全局变量,而不是将它们存储在一个函数中(就像我所做的那样),然后从另一个函数中调用该函数。

############### SPARK REDSHIFT GLOBAL CONFIG #####################
    config = {
    'redshift_user': 'tester123',
    'redshift_pass': '*****************',
    'redshift_port': "5999",
    'redshift_db': 'my_database',
    'redshift_host': 'redshift.my_database.me',
    }

    from pyspark import SparkContext, SparkConf, SQLContext
    jars = [
    "/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
    ]
    conf = (
        SparkConf()
        .setAppName("S3 with Redshift")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("spark.hadoop.fs.s3a.path.style.access", True)
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("com.amazonaws.services.s3.enableV4", True)
        .set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
        .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
        .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
    )
    sc = SparkContext(conf=conf).getOrCreate()
###############################################################


def all_purch_spark():
    sqlContext = SQLContext(sc)

    ##Set Schema and table to query
    schema1 = 'production'
    schema2 = 'X4production'
    table1 = 'purchases'
    table2 = 'customers'
    table3 = 'memberships'
    table4 = 'users'  #set as users table in both schemas

    purchases_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table1}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    customers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table2}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    memberships_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table3}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    users_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    cusers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema2}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()
        
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('fc_purchases').getOrCreate()

    purchases_df.createOrReplaceTempView('purchases')
    customers_df.createOrReplaceTempView('customers')
    memberships_df.createOrReplaceTempView('memberships')
    users_df.createOrReplaceTempView('users')
    cusers_df.createOrReplaceTempView('cusers')

    all_purch = spark.sql("SELECT \
    p_paid.customer_id AS p_paid_user_id \
    ,p_trial.created_at AS trial_start_date \
    ,p_paid.created_at \
    ,cu.graduation_year \
    ,lower(cu.student_year) AS student_year \
    ,lower(p_paid.description) as product \
    ,u.email \
    ,u.id AS u_user_id \
    ,cu.id AS cu_user_id \
    FROM \
    purchases AS p_paid \
    INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
    INNER JOIN customers AS c on c.id = p_paid.customer_id \
    INNER JOIN memberships AS m on m.id = c.membership_id \
    INNER JOIN users AS u on u.id = m.user_id \
    INNER JOIN cusers AS cu on cu.id = u.id \
    WHERE \
    p_trial.created_at >= '2018-03-01' \
    AND p_paid.created_at >= '2018-03-01' \
    AND u.institution_contract = false \
    AND LOWER(u.email) not like '%hotmail.me%' \
    AND LOWER(u.email) not like '%gmail.com%' \
    AND p_trial.description like '% Day Free Trial' \
    AND p_paid.status = 'paid' \
    GROUP BY \
    p_paid_user_id \
    ,trial_start_date \
    ,p_paid.created_at \
    ,u.email \
    ,cu.graduation_year \
    ,student_year \
    ,product \
    ,cu_user_id \
    ,u_user_id \
    ORDER BY p_paid_user_id") 
    all_purch.registerTempTable("all_purch_table")
   
    return all_purch