使用 Spark SQL Streaming 时缺少 Avro Custom Header
Missing Avro Custom Header when using Spark SQL Streaming
在向 Kafka 发送 Avro GenericRecord 之前,像这样插入一个 Header。
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, key, message);
record.headers().add("schema", schema);
正在消耗记录。
使用 Spark Streaming 时,ConsumerRecord 中的 header 是完整的。
KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)).foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println(new String(record.headers().headers("schema").iterator().next().value()));
});
});
;
但是当使用 Spark SQL Streaming 时,header 似乎丢失了。
StreamingQuery query = dataset.writeStream().foreach(new ForeachWriter<>() {
...
@Override
public void process(Row row) {
String topic = (String) row.get(2);
int partition = (int) row.get(3);
long offset = (long) row.get(4);
String key = new String((byte[]) row.get(0));
byte[] value = (byte[]) row.get(1);
ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>(topic, partition, offset, key,
value);
//I need the schema to decode the Avro!
}
}).start();
使用 Spark SQL 流方法时,我在哪里可以找到自定义 header 值?
版本:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
更新
我尝试了 spark-sql_2.12 和 spark-sql-kafka-0-10_2.12 的 3.0.0-preview2。我加了
.option("includeHeaders", true)
但我仍然只从行中获取这些列。
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
结构化流中的 Kafka headers 仅从 3.0 开始支持:https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html
请寻找 includeHeaders
了解更多详情。
在向 Kafka 发送 Avro GenericRecord 之前,像这样插入一个 Header。
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, key, message);
record.headers().add("schema", schema);
正在消耗记录。
使用 Spark Streaming 时,ConsumerRecord 中的 header 是完整的。
KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)).foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println(new String(record.headers().headers("schema").iterator().next().value()));
});
});
;
但是当使用 Spark SQL Streaming 时,header 似乎丢失了。
StreamingQuery query = dataset.writeStream().foreach(new ForeachWriter<>() {
...
@Override
public void process(Row row) {
String topic = (String) row.get(2);
int partition = (int) row.get(3);
long offset = (long) row.get(4);
String key = new String((byte[]) row.get(0));
byte[] value = (byte[]) row.get(1);
ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>(topic, partition, offset, key,
value);
//I need the schema to decode the Avro!
}
}).start();
使用 Spark SQL 流方法时,我在哪里可以找到自定义 header 值?
版本:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
更新
我尝试了 spark-sql_2.12 和 spark-sql-kafka-0-10_2.12 的 3.0.0-preview2。我加了
.option("includeHeaders", true)
但我仍然只从行中获取这些列。
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
结构化流中的 Kafka headers 仅从 3.0 开始支持:https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html
请寻找 includeHeaders
了解更多详情。