在 Spark Streaming 中联合 Flume 个接收者列表
Union a List of Flume Receivers in Spark Streaming
为了按照 Spark Streaming Programming 指南中的建议提高并行性,我正在设置多个接收器并尝试合并它们的列表。此代码按预期工作:
private JavaDStream<SparkFlumeEvent> getEventsWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();
for (String host : hosts) {
for (String port : ports) {
receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
}
}
JavaDStream<SparkFlumeEvent> unionStreams = receivers.get(0)
.union(receivers.get(1))
.union(receivers.get(2))
.union(receivers.get(3))
.union(receivers.get(4))
.union(receivers.get(5));
return unionStreams;
}
但在运行时之前,我实际上并不知道我的集群将有多少个接收器。当我尝试在循环中执行此操作时,我得到了一个 NPE。
private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();
for (String host : hosts) {
for (String port : ports) {
receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
}
}
JavaDStream<SparkFlumeEvent> unionStreams = null;
for (JavaReceiverInputDStream<SparkFlumeEvent> receiver : receivers) {
if (unionStreams == null) {
unionStreams = receiver;
} else {
unionStreams.union(receiver);
}
}
return unionStreams;
}
错误:
16/09/15 17:05:25 ERROR JobScheduler: Error in job generator
java.lang.NullPointerException
at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172)
at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172)
at scala.collection.TraversableOnce$$anonfun$maxBy.apply(TraversableOnce.scala:225)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
16/09/15 17:05:25 INFO MemoryStore: ensureFreeSpace(15128) called with
curMem=520144, maxMem=555755765 16/09/15 17:05:25 INFO MemoryStore:
Block broadcast_24 stored as values in memory (estimated size 14.8 KB,
free 529.5 MB) Exception in thread "main"
java.lang.NullPointerException
at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172)
at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172)
at scala.collection.TraversableOnce$$anonfun$maxBy.apply(TraversableOnce.scala:225)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
正确的做法是什么?
能否请你试试下面的代码,它会解决你的问题:
private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
List<JavaDStream<SparkFlumeEvent>> receivers = new ArrayList<JavaDStream<SparkFlumeEvent>>();
for (String host : hosts) {
for (String port : ports) {
receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
}
}
return jssc.union(receivers.get(0), receivers.subList(1, receivers.size()));;
}
为了按照 Spark Streaming Programming 指南中的建议提高并行性,我正在设置多个接收器并尝试合并它们的列表。此代码按预期工作:
private JavaDStream<SparkFlumeEvent> getEventsWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();
for (String host : hosts) {
for (String port : ports) {
receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
}
}
JavaDStream<SparkFlumeEvent> unionStreams = receivers.get(0)
.union(receivers.get(1))
.union(receivers.get(2))
.union(receivers.get(3))
.union(receivers.get(4))
.union(receivers.get(5));
return unionStreams;
}
但在运行时之前,我实际上并不知道我的集群将有多少个接收器。当我尝试在循环中执行此操作时,我得到了一个 NPE。
private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();
for (String host : hosts) {
for (String port : ports) {
receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
}
}
JavaDStream<SparkFlumeEvent> unionStreams = null;
for (JavaReceiverInputDStream<SparkFlumeEvent> receiver : receivers) {
if (unionStreams == null) {
unionStreams = receiver;
} else {
unionStreams.union(receiver);
}
}
return unionStreams;
}
错误:
16/09/15 17:05:25 ERROR JobScheduler: Error in job generator java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48) 16/09/15 17:05:25 INFO MemoryStore: ensureFreeSpace(15128) called with curMem=520144, maxMem=555755765 16/09/15 17:05:25 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 14.8 KB, free 529.5 MB) Exception in thread "main" java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
正确的做法是什么?
能否请你试试下面的代码,它会解决你的问题:
private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {
List<JavaDStream<SparkFlumeEvent>> receivers = new ArrayList<JavaDStream<SparkFlumeEvent>>();
for (String host : hosts) {
for (String port : ports) {
receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
}
}
return jssc.union(receivers.get(0), receivers.subList(1, receivers.size()));;
}