pyspark 从逗号分隔值列表中创建多行
pyspark create multiple rows from a list of comma separated values
在 pyspark sqlcontext sql 中,已编写代码以获取文本然后重新格式化
但问题是这样的
在数据框中有这样的东西
代码就像
hash_tags_fun = udf(lambda t: re.findall('(#[^#]\w{3,})', t))
hash_tags_in_tweets_df.registerTempTable("hash_tags_table")
hash_tags_result = sqlContext.sql("SELECT text FROM hash_tags_table")
hash_tags_list = hash_tags_result.withColumn('text', hash_tags_fun('text'))
hash_tags_list.show(3)
+-------------------+
| text|
+-------------------+
| [#shutUpAndDANCE]|
| [#SHINee, #AMBER]|
|[#JR50, #flipagram]|
+-------------------+
我需要类似的东西
+-------------------+
| text|
+-------------------+
| #shutUpAndDANCE|
| #SHINee|
| #AMBER|
| #JR50|
| #flipagram|
+-------------------+
hash_tags_list.withColumn("text", explode("text")) has given an error saying
AnalysisException: u"cannot resolve 'explode(text
)' due to data type
mismatch: input to function explode should be array or map type, not
string;; \n'Project [explode(text#24) AS text#68]\n+-
AnalysisBarrier\n
+- Project [(text#9) AS text#24]\n
+- Project [text#9]\n
+- SubqueryAlias hash_tags_table\n
+- Project [text#9]\n
+- Filter text#9 LIKE %#%\n
+- SubqueryAlias twt\n
+- SubqueryAlias tweets\n
+- Relation[country#6,id#7,place#8,text#9,user#10] json\n"
在 上扩展:
您的列看起来像一个数组,但它实际上是一个字符串 - 这就是您对 explode()
的调用不起作用的原因。您必须先将列转换为数组。
这将涉及删除前导和尾随方括号并拆分逗号字符。
先去掉前后括号,可以用pyspark.sql.functions.regexp_replace()
:
from pyspark.sql.functions import regexp_replace, split
df = hash_tags_list.select(regexp_replace("text", r"(^\[)|(\]$)", "").alias("text"))
df.show()
#+-----------------+
#| text|
#+-----------------+
#| #shutUpAndDANCE|
#| #SHINee, #AMBER|
#|#JR50, #flipagram|
#+-----------------+
现在按逗号拆分,后跟 space:
df = df.select(split("text", ", ").alias("text"))
df.show()
#+-------------------+
#| text|
#+-------------------+
#| [#shutUpAndDANCE]|
#| [#SHINee, #AMBER]|
#|[#JR50, #flipagram]|
#+-------------------+
您会注意到这个 输出 与您开始时完全一样,但是当我们检查架构时,我们发现这些实际上是字符串数组:
df.printSchema()
#root
# |-- text: array (nullable = true)
# | |-- element: string (containsNull = true)
将此与原始 DataFrame 的架构进行比较:
hash_tags_list.printSchema()
#root
# |-- text: string (nullable = true)
将数据作为数组,现在可以调用 explode()
:
from pyspark.sql.functions import explode
df = df.select(explode("text").alias("hashtags"))
df.show()
#+---------------+
#| hashtags|
#+---------------+
#|#shutUpAndDANCE|
#| #SHINee|
#| #AMBER|
#| #JR50|
#| #flipagram|
#+---------------+
在 pyspark sqlcontext sql 中,已编写代码以获取文本然后重新格式化 但问题是这样的
在数据框中有这样的东西 代码就像
hash_tags_fun = udf(lambda t: re.findall('(#[^#]\w{3,})', t))
hash_tags_in_tweets_df.registerTempTable("hash_tags_table")
hash_tags_result = sqlContext.sql("SELECT text FROM hash_tags_table")
hash_tags_list = hash_tags_result.withColumn('text', hash_tags_fun('text'))
hash_tags_list.show(3)
+-------------------+
| text|
+-------------------+
| [#shutUpAndDANCE]|
| [#SHINee, #AMBER]|
|[#JR50, #flipagram]|
+-------------------+
我需要类似的东西
+-------------------+
| text|
+-------------------+
| #shutUpAndDANCE|
| #SHINee|
| #AMBER|
| #JR50|
| #flipagram|
+-------------------+
hash_tags_list.withColumn("text", explode("text")) has given an error saying
AnalysisException: u"cannot resolve 'explode(
text
)' due to data type mismatch: input to function explode should be array or map type, not string;; \n'Project [explode(text#24) AS text#68]\n+- AnalysisBarrier\n
+- Project [(text#9) AS text#24]\n
+- Project [text#9]\n
+- SubqueryAlias hash_tags_table\n
+- Project [text#9]\n
+- Filter text#9 LIKE %#%\n
+- SubqueryAlias twt\n
+- SubqueryAlias tweets\n
+- Relation[country#6,id#7,place#8,text#9,user#10] json\n"
在
您的列看起来像一个数组,但它实际上是一个字符串 - 这就是您对 explode()
的调用不起作用的原因。您必须先将列转换为数组。
这将涉及删除前导和尾随方括号并拆分逗号字符。
先去掉前后括号,可以用pyspark.sql.functions.regexp_replace()
:
from pyspark.sql.functions import regexp_replace, split
df = hash_tags_list.select(regexp_replace("text", r"(^\[)|(\]$)", "").alias("text"))
df.show()
#+-----------------+
#| text|
#+-----------------+
#| #shutUpAndDANCE|
#| #SHINee, #AMBER|
#|#JR50, #flipagram|
#+-----------------+
现在按逗号拆分,后跟 space:
df = df.select(split("text", ", ").alias("text"))
df.show()
#+-------------------+
#| text|
#+-------------------+
#| [#shutUpAndDANCE]|
#| [#SHINee, #AMBER]|
#|[#JR50, #flipagram]|
#+-------------------+
您会注意到这个 输出 与您开始时完全一样,但是当我们检查架构时,我们发现这些实际上是字符串数组:
df.printSchema()
#root
# |-- text: array (nullable = true)
# | |-- element: string (containsNull = true)
将此与原始 DataFrame 的架构进行比较:
hash_tags_list.printSchema()
#root
# |-- text: string (nullable = true)
将数据作为数组,现在可以调用 explode()
:
from pyspark.sql.functions import explode
df = df.select(explode("text").alias("hashtags"))
df.show()
#+---------------+
#| hashtags|
#+---------------+
#|#shutUpAndDANCE|
#| #SHINee|
#| #AMBER|
#| #JR50|
#| #flipagram|
#+---------------+