SQL 查询的 Pyspark Dataframe Lambda Map 函数

Pyspark Dataframe Lambda Map Function of SQL Query

假设我们有一个 pyspark.sql.dataframe.DataFrame 对象:

df = sc.parallelize([['John', 'male', 26], 
                 ['Teresa', 'female', 25], 
                 ['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])

我有一个函数 运行s sql 查询 DataFrame 的每一行:

def getInfo(data):
    param_name = data['name']
    param_gender = data['gender']
    param_age = data['age']
    
    sql_query = "SELECT * FROM people_info WHERE name = '{0}' AND gender = '{1}' AND age = {2}".format(param_name, param_gender, param_age)
    info = info.append(spark.sql(sql_query))
    
    return info

我正在尝试 运行 通过 map:

函数每一行
df_info = df.rdd.map(lambda x: getInfo(x))

我遇到错误

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

错误消息实际上是在告诉您到底出了什么问题。您的函数正在尝试从 transformation( df.rdd.map(lambda x: getInfo(x))).

内部访问 SparkContext(sparck.sql(sql_query))

这是我认为你正在尝试做的事情:

df = sc.parallelize([['John', 'male', 26], 
                 ['Teresa', 'female', 25], 
                 ['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
people = spark.table("people_info")

people.join(df, on=[people.name == df.name, people.gender == df.gender, people.age == df.age], how="inner")

这里有一对