如何计算火花流中每秒的项目数?
how to count number of items per second in spark streaming?
我得到一个 json 流,我想每秒计算状态为 "Pending" 的项目数。我怎么做?到目前为止,我有下面的代码,并且 1) 我不确定它是否正确。 2) 它 returns 我是一个 Dstream 但我的 objective 是每秒将一个数字存储到 cassandra 或队列或者你可以想象有函数 public void store(Long number){}
.
// #1
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
@Override
public void call(JavaPairRDD<String, Long> stringLongJavaPairRDD) throws Exception {
store(stringLongJavaPairRDD.count());
}
});
尝试了以下方法:仍然无效,因为它始终打印零不确定是否正确?
// #2
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
store(stringJavaRDD.count());
}
});
部分堆栈跟踪
16/09/10 17:51:39 INFO SparkContext: Starting job: count at Consumer.java:88
16/09/10 17:51:39 INFO DAGScheduler: Got job 17 (count at Consumer.java:88) with 4 output partitions
16/09/10 17:51:39 INFO DAGScheduler: Final stage: ResultStage 17 (count at Consumer.java:88)
16/09/10 17:51:39 INFO DAGScheduler: Parents of final stage: List()
16/09/10 17:51:39 INFO DAGScheduler: Missing parents: List()
16/09/10 17:51:39 INFO DAGScheduler: Submitting ResultStage 17 (MapPartitionsRDD[35] at filter at Consumer.java:72), which has no missing parents
打印 BAR 但不打印 FOO
//Debug code
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
System.out.println("****************FOO******************");
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
System.out.println("*****************BAR******************");
store(stringJavaRDD.count());
}
});
因为你已经过滤了结果集,你可以在 DStream/RDD.
上做一个 count()
此外,如果您每秒钟都在从源代码中阅读,我认为您不需要在此处打开窗口。当微批间隔与聚合频率不匹配时,需要开窗。您正在查看不到一秒的微批处理频率吗?
It returns me a Dstream but my objective is to store a number every second to cassandra or queue
Spark 的工作方式是每次对现有 DStream 进行计算时它都会提供一个 DStream。这样你就可以轻松地将函数链接在一起。您还应该了解 Spark 中转换和操作之间的区别。 filter()、count() 等函数是转换,因为它们对 DStream 进行操作并提供新的 DStream。但是如果您需要副作用(如打印、推送到数据库等),您应该查看 Spark 操作。
如果您需要将 DStream 推送到 cassandra,您应该查看 cassandra 连接器,它会公开功能(Spark 术语中的操作),您可以使用这些连接器将数据推送到 cassandra。
您可以使用 1 秒的滑动 window 和 reduceByKey 函数,而不管批次间隔如何。选择 1 秒幻灯片间隔后,您将每秒收到一个商店呼叫事件。
我得到一个 json 流,我想每秒计算状态为 "Pending" 的项目数。我怎么做?到目前为止,我有下面的代码,并且 1) 我不确定它是否正确。 2) 它 returns 我是一个 Dstream 但我的 objective 是每秒将一个数字存储到 cassandra 或队列或者你可以想象有函数 public void store(Long number){}
.
// #1
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
@Override
public void call(JavaPairRDD<String, Long> stringLongJavaPairRDD) throws Exception {
store(stringLongJavaPairRDD.count());
}
});
尝试了以下方法:仍然无效,因为它始终打印零不确定是否正确?
// #2
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
store(stringJavaRDD.count());
}
});
部分堆栈跟踪
16/09/10 17:51:39 INFO SparkContext: Starting job: count at Consumer.java:88
16/09/10 17:51:39 INFO DAGScheduler: Got job 17 (count at Consumer.java:88) with 4 output partitions
16/09/10 17:51:39 INFO DAGScheduler: Final stage: ResultStage 17 (count at Consumer.java:88)
16/09/10 17:51:39 INFO DAGScheduler: Parents of final stage: List()
16/09/10 17:51:39 INFO DAGScheduler: Missing parents: List()
16/09/10 17:51:39 INFO DAGScheduler: Submitting ResultStage 17 (MapPartitionsRDD[35] at filter at Consumer.java:72), which has no missing parents
打印 BAR 但不打印 FOO
//Debug code
jsonMessagesDStream
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
System.out.println("****************FOO******************");
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
System.out.println("*****************BAR******************");
store(stringJavaRDD.count());
}
});
因为你已经过滤了结果集,你可以在 DStream/RDD.
上做一个 count()此外,如果您每秒钟都在从源代码中阅读,我认为您不需要在此处打开窗口。当微批间隔与聚合频率不匹配时,需要开窗。您正在查看不到一秒的微批处理频率吗?
It returns me a Dstream but my objective is to store a number every second to cassandra or queue
Spark 的工作方式是每次对现有 DStream 进行计算时它都会提供一个 DStream。这样你就可以轻松地将函数链接在一起。您还应该了解 Spark 中转换和操作之间的区别。 filter()、count() 等函数是转换,因为它们对 DStream 进行操作并提供新的 DStream。但是如果您需要副作用(如打印、推送到数据库等),您应该查看 Spark 操作。
如果您需要将 DStream 推送到 cassandra,您应该查看 cassandra 连接器,它会公开功能(Spark 术语中的操作),您可以使用这些连接器将数据推送到 cassandra。
您可以使用 1 秒的滑动 window 和 reduceByKey 函数,而不管批次间隔如何。选择 1 秒幻灯片间隔后,您将每秒收到一个商店呼叫事件。