来自 spark 的 Twitter 数据

Twitter data from spark

我正在学习 Twitter 与 Spark 流的集成。

   import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.SparkContext._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.SparkConf

    /**
     * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
     * stream. The stream is instantiated with credentials and optionally filters supplied by the
     * command line arguments.
     *
     * Run this on your local machine as
     *
     */
    object TwitterPopularTags {
      def main(args: Array[String]) {


        if (args.length < 4) {
          System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
            "<access token> <access token secret> [<filters>]")
          System.exit(1)
        }

        StreamingExamples.setStreamingLogLevels()

        val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
        val filters = args.takeRight(args.length - 4)

        // Set the system properties so that Twitter4j library used by twitter stream
        // can use them to generat OAuth credentials
        System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
        System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
        System.setProperty("twitter4j.oauth.accessToken", accessToken)
        System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

        val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream

        val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

        val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                         .map{case (topic, count) => (count, topic)}
                         .transform(_.sortByKey(false))

        val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
                         .map{case (topic, count) => (count, topic)}
                         .transform(_.sortByKey(false))


        // Print popular hashtags
        topCounts60.foreachRDD(rdd => {
          val topList = rdd.take(10)
          println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
          topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
        })

        topCounts10.foreachRDD(rdd => {
          val topList = rdd.take(10)
          println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
          topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
        })

        ssc.start()
        ssc.awaitTermination()
      }
    }

我无法完全理解下面的 2 行代码:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)

谁能给我解释一下这两行?

感谢和问候,

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

args 是一个数组; take(4) returns 具有前(最左侧)四个元素的子数组。将这四个元素分配给 Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) 意味着 val consumerKey 将保存第一个元素的值; consumerSecret 将保留第二个的值,依此类推。这是一个巧妙的 Scala 技巧,将 "unpacking" 一个数组(也可以用其他集合完成)转换为命名值。

val filters = args.takeRight(args.length - 4)

takeRight(n) returns 右边的子数组,表示数组中的最后 n 个元素。在这里,除了前四个元素之外的所有元素的数组都被分配到一个名为 filters.

的新值中