SQL 查询的 Pyspark Dataframe Lambda Map 函数
Pyspark Dataframe Lambda Map Function of SQL Query
假设我们有一个 pyspark.sql.dataframe.DataFrame
对象:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
我有一个函数 运行s sql
查询 DataFrame 的每一行:
def getInfo(data):
param_name = data['name']
param_gender = data['gender']
param_age = data['age']
sql_query = "SELECT * FROM people_info WHERE name = '{0}' AND gender = '{1}' AND age = {2}".format(param_name, param_gender, param_age)
info = info.append(spark.sql(sql_query))
return info
我正在尝试 运行 通过 map
:
函数每一行
df_info = df.rdd.map(lambda x: getInfo(x))
我遇到错误
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
错误消息实际上是在告诉您到底出了什么问题。您的函数正在尝试从 transformation( df.rdd.map(lambda x: getInfo(x))
).
内部访问 SparkContext(sparck.sql(sql_query)
)
这是我认为你正在尝试做的事情:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
people = spark.table("people_info")
people.join(df, on=[people.name == df.name, people.gender == df.gender, people.age == df.age], how="inner")
这里有一对 。
假设我们有一个 pyspark.sql.dataframe.DataFrame
对象:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
我有一个函数 运行s sql
查询 DataFrame 的每一行:
def getInfo(data):
param_name = data['name']
param_gender = data['gender']
param_age = data['age']
sql_query = "SELECT * FROM people_info WHERE name = '{0}' AND gender = '{1}' AND age = {2}".format(param_name, param_gender, param_age)
info = info.append(spark.sql(sql_query))
return info
我正在尝试 运行 通过 map
:
df_info = df.rdd.map(lambda x: getInfo(x))
我遇到错误
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
错误消息实际上是在告诉您到底出了什么问题。您的函数正在尝试从 transformation( df.rdd.map(lambda x: getInfo(x))
).
sparck.sql(sql_query)
)
这是我认为你正在尝试做的事情:
df = sc.parallelize([['John', 'male', 26],
['Teresa', 'female', 25],
['Jacob', 'male', 6]]).toDF(['name', 'gender', 'age'])
people = spark.table("people_info")
people.join(df, on=[people.name == df.name, people.gender == df.gender, people.age == df.age], how="inner")
这里有一对