无法重新分区 DStream
Not able to repartition the DStream
val sparkConf = new SparkConf().setMaster("yarn-cluster")
.setAppName("SparkJob")
.set("spark.executor.memory","2G")
.set("spark.dynamicAllocation.executorIdleTimeout","5")
val streamingContext = new StreamingContext(sparkConf, Minutes(1))
var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD
var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD
val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_1))
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_2))
val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) =>
{
//some mapping
}
dstream_1.foreachRDD(r => r.repartition(500))
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd))
dstream_2.foreachRDD(r => r.repartition(500))
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2))
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2)
val filtered = fullJoinResult.filter(r => r._2._1.isEmpty)
filtered.foreachRDD{rdd =>
val formatted = rdd.map(r => (r._1 , r._2._2.get))
historyRdd_2.unpersist(false) // unpersist the 'old' history RDD
historyRdd_2 = formatted // assign the new history
historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
}
val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty)
filteredStream.foreachRDD{rdd =>
val formatted = rdd.map(r => (r._1 , r._2._1.get))
historyRdd.unpersist(false) // unpersist the 'old' history RDD
historyRdd = formatted // assign the new history
historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
我无法使用上面的代码对 DStream 进行重新分区,我的输入得到了 128 个分区,这是没有的。的 Kafka partitons ,并且由于加入我需要随机读取和写入数据所以我想通过增加分区数来增加并行性。但是分区仍然是 same.Why 是这样吗?
就像 map
或 filter
一样,repartition
是 Spark 中的一个转换,意味着 3 件事:
- 它returns另一个不可变的RDD
- 很懒惰
- 它需要通过一些行动来实现
考虑这段代码:
dstream_1.foreachRDD(r => r.repartition(500))
在 foreachRDD
中使用 repartition
作为副作用没有任何作用。结果 RDD
从未被使用,因此重新分区永远不会发生。
我们应该 'chain' 此转换与作业中的其他操作一起进行。在这种情况下,实现此目的的一种简单方法是使用 transform
代替:
val repartitionedDStream = dstream_1.transform(rdd => rdd.repartition(500))
... use repartitionedDStream further on ...
val sparkConf = new SparkConf().setMaster("yarn-cluster")
.setAppName("SparkJob")
.set("spark.executor.memory","2G")
.set("spark.dynamicAllocation.executorIdleTimeout","5")
val streamingContext = new StreamingContext(sparkConf, Minutes(1))
var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD
var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD
val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_1))
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_2))
val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) =>
{
//some mapping
}
dstream_1.foreachRDD(r => r.repartition(500))
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd))
dstream_2.foreachRDD(r => r.repartition(500))
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2))
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2)
val filtered = fullJoinResult.filter(r => r._2._1.isEmpty)
filtered.foreachRDD{rdd =>
val formatted = rdd.map(r => (r._1 , r._2._2.get))
historyRdd_2.unpersist(false) // unpersist the 'old' history RDD
historyRdd_2 = formatted // assign the new history
historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
}
val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty)
filteredStream.foreachRDD{rdd =>
val formatted = rdd.map(r => (r._1 , r._2._1.get))
historyRdd.unpersist(false) // unpersist the 'old' history RDD
historyRdd = formatted // assign the new history
historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
我无法使用上面的代码对 DStream 进行重新分区,我的输入得到了 128 个分区,这是没有的。的 Kafka partitons ,并且由于加入我需要随机读取和写入数据所以我想通过增加分区数来增加并行性。但是分区仍然是 same.Why 是这样吗?
就像 map
或 filter
一样,repartition
是 Spark 中的一个转换,意味着 3 件事:
- 它returns另一个不可变的RDD
- 很懒惰
- 它需要通过一些行动来实现
考虑这段代码:
dstream_1.foreachRDD(r => r.repartition(500))
在 foreachRDD
中使用 repartition
作为副作用没有任何作用。结果 RDD
从未被使用,因此重新分区永远不会发生。
我们应该 'chain' 此转换与作业中的其他操作一起进行。在这种情况下,实现此目的的一种简单方法是使用 transform
代替:
val repartitionedDStream = dstream_1.transform(rdd => rdd.repartition(500))
... use repartitionedDStream further on ...