如何从 Apache Spark 中的 Twitter 推文中提取主题标签(或其他数组)
How to extract hashtags (or other Arrays) from Twitter Tweets in Apache Spark
我正在尝试使用 Apache Spark 从 JSON Tweet 对象的文件中对 Twitter Tweet 数据进行分析。
下面是我如何使用 Spark 的 jsonFile 方法加载它:
val sqc = new org.apache.spark.sql.SQLContext(sc)
val tweets = sqc.jsonFile("stored_tweets/*.json")
tweets.registerTempTable("tweets")
接下来,我仅对具有以下行的主题标签实体进行采样:
val hashtags = sqc.sql("SELECT entities.hashtags FROM tweets LIMIT 3")
hashtags.take(1)
结果是:
res14: Array[org.apache.spark.sql.Row] =
Array([ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51,
61),marketing], [ArrayBuffer(88, 102),growthhacking],
[ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup],
[ArrayBuffer(121, 138),contentmarketing])])
如果你仔细观察,数据就在那里,但它被包裹在 Array([ArrayBuffer(xx,yy), hashtag).
有些人建议使用 .map() 或 .flatMap() 使用或不使用 [=116= 的某些自定义函数.getAs() 方法,但我不明白它应该如何工作。
有什么想法吗?
5 月 23 日更新:
一直在浏览 Spark 文档。还是没有进展。 Spark SQL Row 文档 (https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/sql/api/java/Row.html) 建议使用
这样的代码
import org.apache.spark.sql._
val row = hashtags.take(1)
row(0)
但是,在这种情况下,会产生
res124: org.apache.spark.sql.Row = [ArrayBuffer([ArrayBuffer(43,
50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88,
102),growthhacking], [ArrayBuffer(103, 111),inbound],
[ArrayBuffer(112, 120),startup], [ArrayBuffer(121,
138),contentmarketing])]
这里,这个 Whosebug post (org.apache.spark.sql.Row to Int) 建议使用 .get() 系列方法,例如 .getString () 但我的尝试并没有产生太多结果:
row(0).getString(0)
产量:
java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer
cannot be cast to java.lang.String
并且,
row(0).getString(1)
产量:
:28: error: value getString is not a member of Any
row(0).getString(1)
和
row(0)(0)
产量
res184: Any = ArrayBuffer([ArrayBuffer(43, 50),online],
[ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking],
[ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup],
[ArrayBuffer(121, 138),contentmarketing])
但是
row(0)(0)(0)
产量
:28: error: Any does not take parameters
row(0)(0)(0)
所以,还是卡住了。
5 月 24 日更新:
在尝试使用 .textFile()(非 Spark SQL 方式)和使用本机 Scala JSON 解析功能之后,按照此处的说明:Spark SQL - How to select on dates stored as UTC millis from the epoch?, and getting stuck with Spark json4s compatibility issues described here: https://github.com/json4s/json4s/issues/212 ,如果可以解决这些问题,我决定尝试使用 Python 和 pyspark。
更新 2,5 月 24 日:
在朋友的帮助下,他建议尝试类似的方法:
import scala.collection.mutable.ArrayBuffer
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[ArrayBuffer[Any, String]]
我终于找到了一些进展,因为这有效:
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0)
并产生:
res53: Any = [ArrayBuffer(43, 50),online]
但是,在进行时,按照提示:
val a = row(0)(0).asInstanceOf[ArrayBuffer[Any]](0)
a.asInstanceOf[ArrayBuffer[Any, String]]
结果令人沮丧:
22: error: wrong number of type arguments for
scala.collection.mutable.ArrayBuffer, should be 1
a.asInstanceOf[ArrayBuffer[Any, String]]
然后试试这个:
a.asInstanceOf[ArrayBuffer[Any]]
产量:
java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to
scala.collection.mutable.ArrayBuffer
又卡住了。
更新 3,5 月 24 日:
所以,在得到朋友的帮助后,我得到了两种可能的解决方案,这两种解决方案都不是原始问题的直接答案,但无论如何"kind of" 解决了问题。
选项 1(简单的选项,Python):使用 pyspark – 你可以说:
row[0][0][1]
选项 2(Scala 中丑陋的解决方案):
val solution = row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).toString().split(" ")(1).split(",")(1).split("]")(0)
产生:
scala> solution
res28: String = online
我不得不使用 toString() 而不是使用 .asInstanceOf() 的原因是最终对象包装是:
Any = [ArrayBuffer(43, 50),online]
..我们找不到 .asInstanceOf() 方法。这是我们尝试过的事情:
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[ArrayBuffer[Any, String]]
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[Row](0).getString(1)
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).instanceOf[Array[ArrayBuffer[Any], String]]
..但其中 none 有效。
不过,我希望在 Scala 中有一种更优雅的方式来执行此操作,因为 Spark + Scala 的整个 "pipeline building nature" 首先吸引我的是这个包。
正如推特上的回复。这个 Twitter 架构过于嵌套,因此总体来说非常复杂。但是,我们可以在未来为复杂的嵌套字段添加按名称访问器的支持以简化这一点。
附加 Python 和 Scala 版本。
我正在尝试使用 Apache Spark 从 JSON Tweet 对象的文件中对 Twitter Tweet 数据进行分析。
下面是我如何使用 Spark 的 jsonFile 方法加载它:
val sqc = new org.apache.spark.sql.SQLContext(sc)
val tweets = sqc.jsonFile("stored_tweets/*.json")
tweets.registerTempTable("tweets")
接下来,我仅对具有以下行的主题标签实体进行采样:
val hashtags = sqc.sql("SELECT entities.hashtags FROM tweets LIMIT 3")
hashtags.take(1)
结果是:
res14: Array[org.apache.spark.sql.Row] = Array([ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking], [ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup], [ArrayBuffer(121, 138),contentmarketing])])
如果你仔细观察,数据就在那里,但它被包裹在 Array([ArrayBuffer(xx,yy), hashtag).
有些人建议使用 .map() 或 .flatMap() 使用或不使用 [=116= 的某些自定义函数.getAs() 方法,但我不明白它应该如何工作。
有什么想法吗?
5 月 23 日更新:
一直在浏览 Spark 文档。还是没有进展。 Spark SQL Row 文档 (https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/sql/api/java/Row.html) 建议使用
这样的代码import org.apache.spark.sql._
val row = hashtags.take(1)
row(0)
但是,在这种情况下,会产生
res124: org.apache.spark.sql.Row = [ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking], [ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup], [ArrayBuffer(121, 138),contentmarketing])]
这里,这个 Whosebug post (org.apache.spark.sql.Row to Int) 建议使用 .get() 系列方法,例如 .getString () 但我的尝试并没有产生太多结果:
row(0).getString(0)
产量:
java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to java.lang.String
并且,
row(0).getString(1)
产量:
:28: error: value getString is not a member of Any row(0).getString(1)
和
row(0)(0)
产量
res184: Any = ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking], [ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup], [ArrayBuffer(121, 138),contentmarketing])
但是
row(0)(0)(0)
产量
:28: error: Any does not take parameters row(0)(0)(0)
所以,还是卡住了。
5 月 24 日更新:
在尝试使用 .textFile()(非 Spark SQL 方式)和使用本机 Scala JSON 解析功能之后,按照此处的说明:Spark SQL - How to select on dates stored as UTC millis from the epoch?, and getting stuck with Spark json4s compatibility issues described here: https://github.com/json4s/json4s/issues/212 ,如果可以解决这些问题,我决定尝试使用 Python 和 pyspark。
更新 2,5 月 24 日:
在朋友的帮助下,他建议尝试类似的方法:
import scala.collection.mutable.ArrayBuffer
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[ArrayBuffer[Any, String]]
我终于找到了一些进展,因为这有效:
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0)
并产生:
res53: Any = [ArrayBuffer(43, 50),online]
但是,在进行时,按照提示:
val a = row(0)(0).asInstanceOf[ArrayBuffer[Any]](0)
a.asInstanceOf[ArrayBuffer[Any, String]]
结果令人沮丧:
22: error: wrong number of type arguments for scala.collection.mutable.ArrayBuffer, should be 1 a.asInstanceOf[ArrayBuffer[Any, String]]
然后试试这个:
a.asInstanceOf[ArrayBuffer[Any]]
产量:
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to scala.collection.mutable.ArrayBuffer
又卡住了。
更新 3,5 月 24 日:
所以,在得到朋友的帮助后,我得到了两种可能的解决方案,这两种解决方案都不是原始问题的直接答案,但无论如何"kind of" 解决了问题。
选项 1(简单的选项,Python):使用 pyspark – 你可以说:
row[0][0][1]
选项 2(Scala 中丑陋的解决方案):
val solution = row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).toString().split(" ")(1).split(",")(1).split("]")(0)
产生:
scala> solution res28: String = online
我不得不使用 toString() 而不是使用 .asInstanceOf() 的原因是最终对象包装是:
Any = [ArrayBuffer(43, 50),online]
..我们找不到 .asInstanceOf() 方法。这是我们尝试过的事情:
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[ArrayBuffer[Any, String]]
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[Row](0).getString(1)
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).instanceOf[Array[ArrayBuffer[Any], String]]
..但其中 none 有效。
不过,我希望在 Scala 中有一种更优雅的方式来执行此操作,因为 Spark + Scala 的整个 "pipeline building nature" 首先吸引我的是这个包。
正如推特上的回复。这个 Twitter 架构过于嵌套,因此总体来说非常复杂。但是,我们可以在未来为复杂的嵌套字段添加按名称访问器的支持以简化这一点。
附加 Python 和 Scala 版本。