通过从旧数据框 pyspark 中选择列将列附加到新创建的数据框
Append column to newly created dataframe by selecting column from old dataframe pyspark
我正在阅读 JSON 并且我有一本字典 (dictn),它的键告诉我我应该从 JSON df 中 select 的所有列。
我正在尝试创建一个新的 df,然后附加那些来自 dictn 的键出现在 JSON 中的列,但我得到以下 错误 :
非常感谢对这方面的任何帮助,因为我真的是 spark 的新手。
'Resolved attribute(s) ip#238 missing from in operator !Project [ip#238 AS ip#267].;;\n!Project [ip#238 AS ip#267]\n+- LogicalRDD false\
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
import json
from pyspark.sql.functions import explode
jsn={"body":[{"ip":"177.284.10.91","sg_message_id":"YcbG1IBnQ1-626TaUVg2bQ.filter1049p1las1-18982-5C868E5A-20.0","hostname":"d2214390ce89","useragent":"Mozilla/5.0 (Linux; Android 7.1.2; E6810) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.105 Mobile Safari/537.36","method_name":"mass_gifting","email":"test@aol.com","timestamp":1554076768,"url":"https://engagement.test.com/b/genghisgrill","object_id":42813,"category":["42813-3-11-19-bottomless-is-back","713","mass_gifting"],"sg_event_id":"Krfn-yDfTG-CQ-o8zhTb0w","event":"click","klass":"3-11-19-bottomless-is-back","url_offset":{"index":3,"type":"html"},"rails_env":"production","user_id":78003906,"business_id":713}],"account":"testaccount"}
dictn={'ip':'string',
'sg_message_id':'string',
'hostname':'string',
'method_name':'string',
'email':'string',
'timestamp':'bigint',
'smtp-id':'string',
'object_id':'bigint',
'response':'string',
'sg_event_id':'string',
'tls':'string',
'event':'string',
'klass':'string',
'user_id':'string',
'rails_env':'string',
'business_id':'bigint'}
schema = StructType([])
new_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
a=[json.dumps(jsn)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
x=df.select("body")
df1=df.withColumn("foo",explode("body")).select("foo.*")
for k1,v1 in dictn.items():
if k1 in df1.columns:
new_df=new_df.withColumn(k1,df1[k1])
else:
new_df=new_df.withColumn(k1,lit(10))
new_df.show()
您收到该错误是因为您试图通过引用另一个 DataFrame 中的列来添加新列,而实际上 Spark 不支持该列。
这个问题已经在这里提出和回答:Add a column from another DataFrame
但是要在这里实现您想要的,您只需要使用 df1
中的 select
,它会为您提供新的 DataFrame,其中包含您从字典中获得的列列表。
这应该适合你:
select_expr = [col(c).alias(c) if c in df1.columns else lit(10).alias(c) for c, _ in dictn.items()]
new_df = df1.select(select_expr)
new_df.show()
我正在阅读 JSON 并且我有一本字典 (dictn),它的键告诉我我应该从 JSON df 中 select 的所有列。
我正在尝试创建一个新的 df,然后附加那些来自 dictn 的键出现在 JSON 中的列,但我得到以下 错误 : 非常感谢对这方面的任何帮助,因为我真的是 spark 的新手。
'Resolved attribute(s) ip#238 missing from in operator !Project [ip#238 AS ip#267].;;\n!Project [ip#238 AS ip#267]\n+- LogicalRDD false\
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
import json
from pyspark.sql.functions import explode
jsn={"body":[{"ip":"177.284.10.91","sg_message_id":"YcbG1IBnQ1-626TaUVg2bQ.filter1049p1las1-18982-5C868E5A-20.0","hostname":"d2214390ce89","useragent":"Mozilla/5.0 (Linux; Android 7.1.2; E6810) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.105 Mobile Safari/537.36","method_name":"mass_gifting","email":"test@aol.com","timestamp":1554076768,"url":"https://engagement.test.com/b/genghisgrill","object_id":42813,"category":["42813-3-11-19-bottomless-is-back","713","mass_gifting"],"sg_event_id":"Krfn-yDfTG-CQ-o8zhTb0w","event":"click","klass":"3-11-19-bottomless-is-back","url_offset":{"index":3,"type":"html"},"rails_env":"production","user_id":78003906,"business_id":713}],"account":"testaccount"}
dictn={'ip':'string',
'sg_message_id':'string',
'hostname':'string',
'method_name':'string',
'email':'string',
'timestamp':'bigint',
'smtp-id':'string',
'object_id':'bigint',
'response':'string',
'sg_event_id':'string',
'tls':'string',
'event':'string',
'klass':'string',
'user_id':'string',
'rails_env':'string',
'business_id':'bigint'}
schema = StructType([])
new_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
a=[json.dumps(jsn)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
x=df.select("body")
df1=df.withColumn("foo",explode("body")).select("foo.*")
for k1,v1 in dictn.items():
if k1 in df1.columns:
new_df=new_df.withColumn(k1,df1[k1])
else:
new_df=new_df.withColumn(k1,lit(10))
new_df.show()
您收到该错误是因为您试图通过引用另一个 DataFrame 中的列来添加新列,而实际上 Spark 不支持该列。 这个问题已经在这里提出和回答:Add a column from another DataFrame
但是要在这里实现您想要的,您只需要使用 df1
中的 select
,它会为您提供新的 DataFrame,其中包含您从字典中获得的列列表。
这应该适合你:
select_expr = [col(c).alias(c) if c in df1.columns else lit(10).alias(c) for c, _ in dictn.items()]
new_df = df1.select(select_expr)
new_df.show()