Apache Spark 在 updateStateByKey() 之后合并
Apache Spark merge after updateStateByKey()
我正在尝试合并两个流,其中之一应该是有状态的(比如不经常更新的静态数据):
SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]");
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10));
context.checkpoint(".");
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998);
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999);
JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
});
JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> {
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();
});
pairDataStream.join(pairRefDataStream).print();
context.start();
context.awaitTermination();
当我将 1 aaa
写入第一个流并将 1 111
写入第二个流时,立即一切正常,我看到合并的结果。但是,当我在一分钟后将 1 bbb
写入第一个流时,我什么也没看到。
我是否正确理解了 updateStateByKey()
的作用?还是我错了?
updateStateByKey
完全按照您的要求进行。特别是如果当前 window 不包含任何数据 (strings.isEmpty()
) 你指示它忘记 (return Optional.absent();
):
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();
而您可能想要的是 return 以前的状态:
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return stringOptional;
我正在尝试合并两个流,其中之一应该是有状态的(比如不经常更新的静态数据):
SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]");
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10));
context.checkpoint(".");
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998);
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999);
JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
});
JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> {
String[] tmp = e.split(" ");
return new Tuple2<>(tmp[0], tmp[1]);
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> {
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();
});
pairDataStream.join(pairRefDataStream).print();
context.start();
context.awaitTermination();
当我将 1 aaa
写入第一个流并将 1 111
写入第二个流时,立即一切正常,我看到合并的结果。但是,当我在一分钟后将 1 bbb
写入第一个流时,我什么也没看到。
我是否正确理解了 updateStateByKey()
的作用?还是我错了?
updateStateByKey
完全按照您的要求进行。特别是如果当前 window 不包含任何数据 (strings.isEmpty()
) 你指示它忘记 (return Optional.absent();
):
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return Optional.absent();
而您可能想要的是 return 以前的状态:
if (!strings.isEmpty()) {
return Optional.of(strings.get(0));
}
return stringOptional;