在使用带有 kafka 的 spark streaming 时,无法迭代从覆盖 Dstream 到 List 检索到的键列表
Unable to Iterate over the list of keys retrieved from coverting Dstream to List while using spark streaming with kafka
下面是使用kafka进行spark streaming的代码。
在这里,我试图将批处理的密钥作为 Dstream,然后将其转换为 LIST。为了迭代它并将与每个键相关的数据放在以键命名的 hdfs 文件夹中。
关键基本上是 - Schema.Table_name
val ssc = new StreamingContext(sparkConf, Seconds(args{7}.toLong)) // configured to run for every 60 seconds
val warehouseLocation="Spark-warehouse"
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
import spark.implicits._
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"zookeeper.connect" -> conf.getString("kafka.zookeeper"),
"group.id" -> conf.getString("kafka.consumergroups"),
"auto.offset.reset" -> args { 1 },
"enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"security.protocol" -> "SASL_PLAINTEXT",
"session.timeout.ms" -> args { 2 },
"max.poll.records" -> args { 3 },
"request.timeout.ms" -> args { 4 },
"fetch.max.wait.ms" -> args { 5 })
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.
Subscribe[String, String](topicsSet, kafkaParams))
正在提取密钥,但它是 DStream[String] 类型
val keys = messages.map(x=>(x.key()))
var final_list_of_keys = List[String]()
将其转换为列表并更新 var final_list_of_keys
keys.foreachRDD( rdd => {
val df_keys = spark.read.json(rdd).distinct().toDF().persist(StorageLevel.MEMORY_ONLY)
df_keys.show()
val comma_separated_keys= df_keys.distinct().collect().mkString("").replace("[","").replace("]",",")
final_list_of_keys= comma_separated_keys.split(",").toList
现在尝试遍历列表。
for ( i <- final_list_of_keys)
{
println(i)
val message1 = messages.filter(x => x.key().toString().equals(i)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY) //.toString())
message1.foreachRDD((rdd, batchTime) => {
if (!rdd.isEmpty())
{
val df1 = spark.read.json(rdd).persist(StorageLevel.MEMORY_ONLY) //.withColumn("pharmacy_location",lit(args{6}))
val df2=df1.withColumn("message",struct( struct($"message.data.*",lit(args{6}).as("pharmacy_location")).alias("data"), struct($"message.headers.*").as("headers"))).persist(StorageLevel.MEMORY_ONLY)
val df3= df2.drop("headers").drop("messageSchema").drop("messageSchemaId").persist(StorageLevel.MEMORY_ONLY)
df3.coalesce(1).write.json(conf.getString("hdfs.streamoutpath1")+ PATH_SEPERATOR + i + PATH_SEPERATOR + args{6}+ PATH_SEPERATOR+ date_today.format(System.currentTimeMillis())
+ PATH_SEPERATOR + date_today_hour.format(System.currentTimeMillis()) + PATH_SEPERATOR + System.currentTimeMillis())
df1.unpersist
df2.unpersist()
df3.unpersist()
}
})
try
{
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) // push it back
}
}
catch
{
case e: BlockMissingException => e.printStackTrace()
case e: IOException => e.printStackTrace()
case e:Throwable => e.printStackTrace()
}
}
ssc.start()
ssc.awaitTermination()
但我收到错误 - 不支持在启动上下文后添加新的输入、转换和输出操作
当我试图在 keys.foreachRdd 之外的列表中保留 for 循环时,列表没有得到更新并保持为空。
有人可以建议我如何才能真正重做此代码以将键放在列表中然后遍历它们以将数据放入正确的目录中。
根据我的研究,我看到 post -
Similar post but unable to gather any solution from it
此外,由于我在 foreachRdd 中使用 map、filter,然后在其中使用另一个 foreachRdd,这可能会导致问题。
参考 post -
下面是问题的代码 -
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.
Subscribe[String, String](topicsSet, kafkaParams)).persist(StorageLevel.MEMORY_ONLY)
messages.foreachRDD((rdd,batchTime) => ///foreachRDD means go over each rdd parallelly , it gives the rdd and we will put the batch time also
{
val table_list=rdd.map(x => x.key()).distinct().collect() ////kafka sends data in key value pairs,
///here rdd means key and values(key is tablename) and first we need to get all the distinct keys(this batch had 5 tables)
val rddList = table_list.map(x=>(x,(rdd.filter(y=>y.key().equals(x)))))
///here x means table name and we are filtering data in the rdd which is equalent to current_table_name
///Now this table_list will contains the key(table) and values corresponding to each key
rddList.foreach(tuple => //here foreach not in parallal, we want to go one by one , touple is nothing but collection of key and multiple
{
val tableName= tuple._1.toString() //tuple._1 will be the table name
val tableRdd= tuple._2.map(x=>(x.value())).persist(StorageLevel.MEMORY_ONLY) // .toDF()
///tuple._2 will be the complete key value pair,we are putting the value in the hdfs
// val tableRdd= messages.filter(x => x.key().toString().equals(tableName)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY)
println(tableName)
/* Your logic */
下面是使用kafka进行spark streaming的代码。 在这里,我试图将批处理的密钥作为 Dstream,然后将其转换为 LIST。为了迭代它并将与每个键相关的数据放在以键命名的 hdfs 文件夹中。
关键基本上是 - Schema.Table_name
val ssc = new StreamingContext(sparkConf, Seconds(args{7}.toLong)) // configured to run for every 60 seconds
val warehouseLocation="Spark-warehouse"
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
import spark.implicits._
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"zookeeper.connect" -> conf.getString("kafka.zookeeper"),
"group.id" -> conf.getString("kafka.consumergroups"),
"auto.offset.reset" -> args { 1 },
"enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"security.protocol" -> "SASL_PLAINTEXT",
"session.timeout.ms" -> args { 2 },
"max.poll.records" -> args { 3 },
"request.timeout.ms" -> args { 4 },
"fetch.max.wait.ms" -> args { 5 })
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.
Subscribe[String, String](topicsSet, kafkaParams))
正在提取密钥,但它是 DStream[String] 类型
val keys = messages.map(x=>(x.key()))
var final_list_of_keys = List[String]()
将其转换为列表并更新 var final_list_of_keys
keys.foreachRDD( rdd => {
val df_keys = spark.read.json(rdd).distinct().toDF().persist(StorageLevel.MEMORY_ONLY)
df_keys.show()
val comma_separated_keys= df_keys.distinct().collect().mkString("").replace("[","").replace("]",",")
final_list_of_keys= comma_separated_keys.split(",").toList
现在尝试遍历列表。
for ( i <- final_list_of_keys)
{
println(i)
val message1 = messages.filter(x => x.key().toString().equals(i)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY) //.toString())
message1.foreachRDD((rdd, batchTime) => {
if (!rdd.isEmpty())
{
val df1 = spark.read.json(rdd).persist(StorageLevel.MEMORY_ONLY) //.withColumn("pharmacy_location",lit(args{6}))
val df2=df1.withColumn("message",struct( struct($"message.data.*",lit(args{6}).as("pharmacy_location")).alias("data"), struct($"message.headers.*").as("headers"))).persist(StorageLevel.MEMORY_ONLY)
val df3= df2.drop("headers").drop("messageSchema").drop("messageSchemaId").persist(StorageLevel.MEMORY_ONLY)
df3.coalesce(1).write.json(conf.getString("hdfs.streamoutpath1")+ PATH_SEPERATOR + i + PATH_SEPERATOR + args{6}+ PATH_SEPERATOR+ date_today.format(System.currentTimeMillis())
+ PATH_SEPERATOR + date_today_hour.format(System.currentTimeMillis()) + PATH_SEPERATOR + System.currentTimeMillis())
df1.unpersist
df2.unpersist()
df3.unpersist()
}
})
try
{
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) // push it back
}
}
catch
{
case e: BlockMissingException => e.printStackTrace()
case e: IOException => e.printStackTrace()
case e:Throwable => e.printStackTrace()
}
}
ssc.start()
ssc.awaitTermination()
但我收到错误 - 不支持在启动上下文后添加新的输入、转换和输出操作
当我试图在 keys.foreachRdd 之外的列表中保留 for 循环时,列表没有得到更新并保持为空。
有人可以建议我如何才能真正重做此代码以将键放在列表中然后遍历它们以将数据放入正确的目录中。
根据我的研究,我看到 post -
Similar post but unable to gather any solution from it
此外,由于我在 foreachRdd 中使用 map、filter,然后在其中使用另一个 foreachRdd,这可能会导致问题。
参考 post -
下面是问题的代码 -
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.
Subscribe[String, String](topicsSet, kafkaParams)).persist(StorageLevel.MEMORY_ONLY)
messages.foreachRDD((rdd,batchTime) => ///foreachRDD means go over each rdd parallelly , it gives the rdd and we will put the batch time also
{
val table_list=rdd.map(x => x.key()).distinct().collect() ////kafka sends data in key value pairs,
///here rdd means key and values(key is tablename) and first we need to get all the distinct keys(this batch had 5 tables)
val rddList = table_list.map(x=>(x,(rdd.filter(y=>y.key().equals(x)))))
///here x means table name and we are filtering data in the rdd which is equalent to current_table_name
///Now this table_list will contains the key(table) and values corresponding to each key
rddList.foreach(tuple => //here foreach not in parallal, we want to go one by one , touple is nothing but collection of key and multiple
{
val tableName= tuple._1.toString() //tuple._1 will be the table name
val tableRdd= tuple._2.map(x=>(x.value())).persist(StorageLevel.MEMORY_ONLY) // .toDF()
///tuple._2 will be the complete key value pair,we are putting the value in the hdfs
// val tableRdd= messages.filter(x => x.key().toString().equals(tableName)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY)
println(tableName)
/* Your logic */