Spark Streaming 中的批量大小
Batch Size in Spark Streaming
我是 Spark 和 Spark Streaming 的新手。我正在处理 Twitter 流数据。我的任务涉及独立处理每条推文,例如计算每条推文中的单词数。根据我的阅读,每个输入批处理在 Spark Streaming 中的 RDD 上形成。因此,如果我给出 2 秒的批处理间隔,那么新的 RDD 将包含两秒内的所有推文,并且应用的任何转换都将应用于整个两秒的数据,并且无法在这两秒内处理单个推文。我的理解正确吗?或者每条推文形成一个新的 RDD?我有点困惑...
在一个批次中,您有一个 RDD,其中包含以 2 秒为间隔出现的所有状态。然后您可以单独处理这些状态。这是一个简短的例子:
JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);
inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
@Override
public Void call(JavaRDD<Status> status, Time time) throws Exception {
List<Status> statuses=status.collect();
for(Status st:statuses){
System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());
//Process and store status somewhere
}
return null;
}});
ctx.start();
ctx.awaitTermination();
}
希望我没有误解你的问题。
卓然
我是 Spark 和 Spark Streaming 的新手。我正在处理 Twitter 流数据。我的任务涉及独立处理每条推文,例如计算每条推文中的单词数。根据我的阅读,每个输入批处理在 Spark Streaming 中的 RDD 上形成。因此,如果我给出 2 秒的批处理间隔,那么新的 RDD 将包含两秒内的所有推文,并且应用的任何转换都将应用于整个两秒的数据,并且无法在这两秒内处理单个推文。我的理解正确吗?或者每条推文形成一个新的 RDD?我有点困惑...
在一个批次中,您有一个 RDD,其中包含以 2 秒为间隔出现的所有状态。然后您可以单独处理这些状态。这是一个简短的例子:
JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);
inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
@Override
public Void call(JavaRDD<Status> status, Time time) throws Exception {
List<Status> statuses=status.collect();
for(Status st:statuses){
System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());
//Process and store status somewhere
}
return null;
}});
ctx.start();
ctx.awaitTermination();
}
希望我没有误解你的问题。
卓然