无法保留 DStream 以供下一批使用
Not able to persist the DStream for use in next batch
JavaRDD<String> history_ = sc.emptyRDD();
java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);
JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
return new Tuple2< String,ArrayList<String> >(null,null);
});
JavaPairInputDStream<String, GenericData.Record> stream_1 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_1);
JavaPairInputDStream<String, GenericData.Record> stream_2 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_2);
然后进行一些转换并创建类型为
的 twp DStream Data_1 和 Data_2
JavaPairDStream<String, <ArrayList<String>>
并按如下方式进行连接,然后过滤掉那些没有连接键的记录,并将它们保存在历史记录中,以便通过与 Data_1
进行联合来在下一批中使用它
Data_1 = Data_1.union(history);
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
Data_1.leftOuterJoin(Data_2).cache();
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());
history = dstream_filtered.mapToPair(r -> {
return new Tuple2<>(r._1,r._2._1);
}).persist;
我在上一步之后得到了历史记录(通过将它保存到 hdfs 进行检查),但是在进行合并时这个历史记录仍然是空的。
"remember" 一个 DStream
在概念上是不可能的。 DStreams
是有时间限制的,在每个时钟节拍(称为 "batch interval")上,DStream
表示 在该时间段内在流中观察到的数据时间.
因此,我们不能保存 "old" DStream
以加入 "new" DStream
。所有 DStreams
都住在 "now"。
DStreams
的底层数据结构是 RDD
:每个批次间隔,我们的 DStream
将有 1 个 RDD
该间隔的数据。
RDD
s 表示分布式数据集合。 RDD
只要我们有对它们的引用,它们就是不可变的和永久的。
我们可以组合 RDD
和 DStream
来创建此处所需的 "history roll over"。
它看起来与问题的方法非常相似,但只使用 history
RDD
.
以下是建议更改的高级视图:
var history: RDD[(String, List[String]) = sc.emptyRDD()
val dstream1 = ...
val dstream2 = ...
val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)
... do stuff with joined as above, obtain dstreamFiltered ...
dstreamFiltered.foreachRDD{rdd =>
val formatted = rdd.map{case (k,(v1,v2)) => (k,v1)} // get rid of the join info
history.unpersist(false) // unpersist the 'old' history RDD
history = formatted // assign the new history
history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
history.count() //action to materialize this transformation
}
这只是一个起点。关于 checkpoint
ing 还有其他注意事项。否则 history
RDD 的谱系将无限增长,直到发生一些 Whosebug。这个博客对这种特殊技术非常完整:http://www.spark.tc/stateful-spark-streaming-using-transform/
我还建议您使用 Scala 而不是 Java。 Java 语法过于冗长,无法与 Spark Streaming 一起使用。
JavaRDD<String> history_ = sc.emptyRDD();
java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);
JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
return new Tuple2< String,ArrayList<String> >(null,null);
});
JavaPairInputDStream<String, GenericData.Record> stream_1 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_1);
JavaPairInputDStream<String, GenericData.Record> stream_2 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_2);
然后进行一些转换并创建类型为
的 twp DStream Data_1 和 Data_2JavaPairDStream<String, <ArrayList<String>>
并按如下方式进行连接,然后过滤掉那些没有连接键的记录,并将它们保存在历史记录中,以便通过与 Data_1
进行联合来在下一批中使用它 Data_1 = Data_1.union(history);
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
Data_1.leftOuterJoin(Data_2).cache();
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());
history = dstream_filtered.mapToPair(r -> {
return new Tuple2<>(r._1,r._2._1);
}).persist;
我在上一步之后得到了历史记录(通过将它保存到 hdfs 进行检查),但是在进行合并时这个历史记录仍然是空的。
"remember" 一个 DStream
在概念上是不可能的。 DStreams
是有时间限制的,在每个时钟节拍(称为 "batch interval")上,DStream
表示 在该时间段内在流中观察到的数据时间.
因此,我们不能保存 "old" DStream
以加入 "new" DStream
。所有 DStreams
都住在 "now"。
DStreams
的底层数据结构是 RDD
:每个批次间隔,我们的 DStream
将有 1 个 RDD
该间隔的数据。
RDD
s 表示分布式数据集合。 RDD
只要我们有对它们的引用,它们就是不可变的和永久的。
我们可以组合 RDD
和 DStream
来创建此处所需的 "history roll over"。
它看起来与问题的方法非常相似,但只使用 history
RDD
.
以下是建议更改的高级视图:
var history: RDD[(String, List[String]) = sc.emptyRDD()
val dstream1 = ...
val dstream2 = ...
val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)
... do stuff with joined as above, obtain dstreamFiltered ...
dstreamFiltered.foreachRDD{rdd =>
val formatted = rdd.map{case (k,(v1,v2)) => (k,v1)} // get rid of the join info
history.unpersist(false) // unpersist the 'old' history RDD
history = formatted // assign the new history
history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
history.count() //action to materialize this transformation
}
这只是一个起点。关于 checkpoint
ing 还有其他注意事项。否则 history
RDD 的谱系将无限增长,直到发生一些 Whosebug。这个博客对这种特殊技术非常完整:http://www.spark.tc/stateful-spark-streaming-using-transform/
我还建议您使用 Scala 而不是 Java。 Java 语法过于冗长,无法与 Spark Streaming 一起使用。