如何计算火花流中每秒的项目数?

how to count number of items per second in spark streaming?

我得到一个 json 流,我想每秒计算状态为 "Pending" 的项目数。我怎么做?到目前为止,我有下面的代码,并且 1) 我不确定它是否正确。 2) 它 returns 我是一个 Dstream 但我的 objective 是每秒将一个数字存储到 cassandra 或队列或者你可以想象有函数 public void store(Long number){} .

  // #1
 jsonMessagesDStream
        .filter(new Function<String, Boolean>() {
        @Override
        public Boolean call(String v1) throws Exception {
            JsonParser parser = new JsonParser();
            JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
            if (jsonObj != null && jsonObj.has("status")) {
                return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
            }
            return false;
        }
    }).countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
        @Override
        public void call(JavaPairRDD<String, Long> stringLongJavaPairRDD) throws Exception {
            store(stringLongJavaPairRDD.count());
        }
    });

尝试了以下方法:仍然无效,因为它始终打印零不确定是否正确?

     // #2
    jsonMessagesDStream
        .filter(new Function<String, Boolean>() {
        @Override
        public Boolean call(String v1) throws Exception {
            JsonParser parser = new JsonParser();
            JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
            if (jsonObj != null && jsonObj.has("status")) {
                return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
            }
            return false;
        }
    }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> stringJavaRDD) throws Exception {
            store(stringJavaRDD.count());
        }
    });

部分堆栈跟踪

16/09/10 17:51:39 INFO SparkContext: Starting job: count at Consumer.java:88
16/09/10 17:51:39 INFO DAGScheduler: Got job 17 (count at Consumer.java:88) with 4 output partitions
16/09/10 17:51:39 INFO DAGScheduler: Final stage: ResultStage 17 (count at Consumer.java:88)
16/09/10 17:51:39 INFO DAGScheduler: Parents of final stage: List()
16/09/10 17:51:39 INFO DAGScheduler: Missing parents: List()
16/09/10 17:51:39 INFO DAGScheduler: Submitting ResultStage 17 (MapPartitionsRDD[35] at filter at Consumer.java:72), which has no missing parents

打印 BAR 但不打印 FOO

//Debug code
jsonMessagesDStream
        .filter(new Function<String, Boolean>() {
        @Override
        public Boolean call(String v1) throws Exception {
            System.out.println("****************FOO******************");
            JsonParser parser = new JsonParser();
            JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
            if (jsonObj != null && jsonObj.has("status")) {
                return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
            }
            return false;
        }
    }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> stringJavaRDD) throws Exception {
            System.out.println("*****************BAR******************");
            store(stringJavaRDD.count());
        }
    });

因为你已经过滤了结果集,你可以在 DStream/RDD.

上做一个 count()

此外,如果您每秒钟都在从源代码中阅读,我认为您不需要在此处打开窗口。当微批间隔与聚合频率不匹配时,需要开窗。您正在查看不到一秒的微批处理频率吗?

It returns me a Dstream but my objective is to store a number every second to cassandra or queue

Spark 的工作方式是每次对现有 DStream 进行计算时它都会提供一个 DStream。这样你就可以轻松地将函数链接在一起。您还应该了解 Spark 中转换和操作之间的区别。 filter()、count() 等函数是转换,因为它们对 DStream 进行操作并提供新的 DStream。但是如果您需要副作用(如打印、推送到数据库等),您应该查看 Spark 操作。

如果您需要将 DStream 推送到 cassandra,您应该查看 cassandra 连接器,它会公开功能(Spark 术语中的操作),您可以使用这些连接器将数据推送到 cassandra。

您可以使用 1 秒的滑动 window 和 reduceByKey 函数,而不管批次间隔如何。选择 1 秒幻灯片间隔后,您将每秒收到一个商店呼叫事件。