java.io.IOException:无法将语句写入 batch_layer.test。最新的异常是 Key may not be empty
java.io.IOException: Failed to write statements to batch_layer.test. The latest exception was Key may not be empty
我正在尝试计算文本中的单词数并将结果保存到 Cassandra 数据库中。
Producer从文件中读取数据并发送给kafka。 Consumer使用spark streaming读取并处理日期,然后将计算结果发送给table.
我的制作人长这样:
object ProducerPlayground extends App {
val topicName = "test"
private def createProducer: Properties = {
val producerProperties = new Properties()
producerProperties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092"
)
producerProperties.setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName
)
producerProperties.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName
)
producerProperties
}
val producer = new KafkaProducer[Int, String](createProducer)
val source = Source.fromFile("G:\text.txt", "UTF-8")
val lines = source.getLines()
var key = 0
for (line <- lines) {
producer.send(new ProducerRecord[Int, String](topicName, key, line))
key += 1
}
source.close()
producer.flush()
}
消费者看起来像这样:
object BatchLayer {
def main(args: Array[String]) {
val brokers = "localhost:9092"
val topics = "test"
val groupId = "groupId-1"
val sparkConf = new SparkConf()
.setAppName("BatchLayer")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val sc = ssc.sparkContext
sc.setLogLevel("OFF")
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
)
val stream =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
val cass = CassandraConnector(sparkConf)
cass.withSessionDo { session =>
session.execute(
s"CREATE KEYSPACE IF NOT EXISTS batch_layer WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"
)
session.execute(s"CREATE TABLE IF NOT EXISTS batch_layer.test (key VARCHAR PRIMARY KEY, value INT)")
session.execute(s"TRUNCATE batch_layer.test")
}
stream
.map(v => v.value())
.flatMap(x => x.split(" "))
.filter(x => !x.contains(Array('\n', '\t')))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("batch_layer", "test", SomeColumns("key", "value"))
ssc.start()
ssc.awaitTermination()
}
}
启动生产者后,程序停止运行并出现此错误。我做错了什么?
在 2021 年使用遗留流已经没有什么意义了——使用起来非常麻烦,而且你还需要跟踪 Kafka 等的偏移量。最好使用 Structured Streaming instead——它会跟踪偏移量通过检查点,您将使用高级数据集 API 等。
在您的案例中,代码可能如下所示(未测试,但它是从 this working example 中采用的):
val streamingInputDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val wordsCountsDF = streamingInputDF.selectExpr("CAST(value AS STRING) as value")
.selectExpr("split(value, '\w+', -1) as words")
.selectExpr("explode(words) as word")
.filter("word != ''")
.groupBy($"word")
.count()
.select($"word", $"count")
// create table ...
val query = wordsCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "path_to_checkpoint)
.option("keyspace", "test")
.option("table", "<table_name>")
.start()
query.awaitTermination()
P.S。在您的示例中,最可能的错误是您试图直接在 DStream 上使用 .saveToCassandra
- 它不会以这种方式工作。
我正在尝试计算文本中的单词数并将结果保存到 Cassandra 数据库中。 Producer从文件中读取数据并发送给kafka。 Consumer使用spark streaming读取并处理日期,然后将计算结果发送给table.
我的制作人长这样:
object ProducerPlayground extends App {
val topicName = "test"
private def createProducer: Properties = {
val producerProperties = new Properties()
producerProperties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092"
)
producerProperties.setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName
)
producerProperties.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName
)
producerProperties
}
val producer = new KafkaProducer[Int, String](createProducer)
val source = Source.fromFile("G:\text.txt", "UTF-8")
val lines = source.getLines()
var key = 0
for (line <- lines) {
producer.send(new ProducerRecord[Int, String](topicName, key, line))
key += 1
}
source.close()
producer.flush()
}
消费者看起来像这样:
object BatchLayer {
def main(args: Array[String]) {
val brokers = "localhost:9092"
val topics = "test"
val groupId = "groupId-1"
val sparkConf = new SparkConf()
.setAppName("BatchLayer")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val sc = ssc.sparkContext
sc.setLogLevel("OFF")
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
)
val stream =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
val cass = CassandraConnector(sparkConf)
cass.withSessionDo { session =>
session.execute(
s"CREATE KEYSPACE IF NOT EXISTS batch_layer WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"
)
session.execute(s"CREATE TABLE IF NOT EXISTS batch_layer.test (key VARCHAR PRIMARY KEY, value INT)")
session.execute(s"TRUNCATE batch_layer.test")
}
stream
.map(v => v.value())
.flatMap(x => x.split(" "))
.filter(x => !x.contains(Array('\n', '\t')))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("batch_layer", "test", SomeColumns("key", "value"))
ssc.start()
ssc.awaitTermination()
}
}
启动生产者后,程序停止运行并出现此错误。我做错了什么?
在 2021 年使用遗留流已经没有什么意义了——使用起来非常麻烦,而且你还需要跟踪 Kafka 等的偏移量。最好使用 Structured Streaming instead——它会跟踪偏移量通过检查点,您将使用高级数据集 API 等。
在您的案例中,代码可能如下所示(未测试,但它是从 this working example 中采用的):
val streamingInputDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val wordsCountsDF = streamingInputDF.selectExpr("CAST(value AS STRING) as value")
.selectExpr("split(value, '\w+', -1) as words")
.selectExpr("explode(words) as word")
.filter("word != ''")
.groupBy($"word")
.count()
.select($"word", $"count")
// create table ...
val query = wordsCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "path_to_checkpoint)
.option("keyspace", "test")
.option("table", "<table_name>")
.start()
query.awaitTermination()
P.S。在您的示例中,最可能的错误是您试图直接在 DStream 上使用 .saveToCassandra
- 它不会以这种方式工作。