Kafka 生产者:将 avro 作为 array[byte] 发送而没有模式
Kafka producer: send avro as array[byte] without schema
我正在尝试在本地设置一个简单的 kafka 堆栈,现在我需要创建一个玩具 Producer。这:https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html(我感兴趣的代码见下文)几乎正是我想要的,除了:
此处生产者发送了一个 GenericData.Record 对象,因此发送了整个模式并且它不利用模式注册表。我想发送一个 Array[Byte],前几个字节是架构的 ID,后面的字节是数据,没有架构(或者我认为这是最佳方式)
我说的这段代码:
import java.util.Properties
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.slf4j.LoggerFactory
case class User(name: String, favoriteNumber: Int, favoriteColor: String)
class AvroProducer {
val logger = LoggerFactory.getLogger(getClass)
val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")
val props = new Properties()
props.put("bootstrap.servers", kafkaBootstrapServer)
props.put("schema.registry.url", schemaRegistryUrl)
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("acks", "1")
val producer = new KafkaProducer[String, GenericData.Record](props)
val schemaParser = new Parser
val key = "key1"
val valueSchemaJson =
s"""
{
"namespace": "com.avro.junkie",
"type": "record",
"name": "User2",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": "int"},
{"name": "favoriteColor", "type": "string"}
]
}
"""
val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
val avroRecord = new GenericData.Record(valueSchemaAvro)
val mary = new User("Mary", 840, "Green")
avroRecord.put("name", mary.name)
avroRecord.put("favoriteNumber", mary.favoriteNumber)
avroRecord.put("favoriteColor", mary.favoriteColor)
def start = {
try {
val record = new ProducerRecord("users", key, avroRecord)
val ack = producer.send(record).get()
// grabbing the ack and logging for visibility
logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
}
catch {
case e: Throwable => logger.error(e.getMessage, e)
}
}
}
问题:
- 我不知道如何从架构注册表中检索架构的 ID
- 我不知道如何只发送没有架构的数据 + id 作为 Array[Byte]
我知道如何将整个 avro 写入 Array[Byte]:
val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
val out = new ByteArrayOutputStream
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder) // but here I am also writing the schema, right?
encoder.flush
out.close
out.toByteArray
非常感谢
第一个代码确实使用模式注册表,并计算一个 ID + 在 KafkaAvroSerializer
内为您替换字节数组中的模式
如果要绕过Schema Registry,使用ByteArraySerializer
并将第二个代码块中out.toByteArray
的结果发送给生产者。
我正在尝试在本地设置一个简单的 kafka 堆栈,现在我需要创建一个玩具 Producer。这:https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html(我感兴趣的代码见下文)几乎正是我想要的,除了:
此处生产者发送了一个 GenericData.Record 对象,因此发送了整个模式并且它不利用模式注册表。我想发送一个 Array[Byte],前几个字节是架构的 ID,后面的字节是数据,没有架构(或者我认为这是最佳方式)
我说的这段代码:
import java.util.Properties
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.slf4j.LoggerFactory
case class User(name: String, favoriteNumber: Int, favoriteColor: String)
class AvroProducer {
val logger = LoggerFactory.getLogger(getClass)
val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")
val props = new Properties()
props.put("bootstrap.servers", kafkaBootstrapServer)
props.put("schema.registry.url", schemaRegistryUrl)
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("acks", "1")
val producer = new KafkaProducer[String, GenericData.Record](props)
val schemaParser = new Parser
val key = "key1"
val valueSchemaJson =
s"""
{
"namespace": "com.avro.junkie",
"type": "record",
"name": "User2",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": "int"},
{"name": "favoriteColor", "type": "string"}
]
}
"""
val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
val avroRecord = new GenericData.Record(valueSchemaAvro)
val mary = new User("Mary", 840, "Green")
avroRecord.put("name", mary.name)
avroRecord.put("favoriteNumber", mary.favoriteNumber)
avroRecord.put("favoriteColor", mary.favoriteColor)
def start = {
try {
val record = new ProducerRecord("users", key, avroRecord)
val ack = producer.send(record).get()
// grabbing the ack and logging for visibility
logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
}
catch {
case e: Throwable => logger.error(e.getMessage, e)
}
}
}
问题:
- 我不知道如何从架构注册表中检索架构的 ID
- 我不知道如何只发送没有架构的数据 + id 作为 Array[Byte]
我知道如何将整个 avro 写入 Array[Byte]:
val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
val out = new ByteArrayOutputStream
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder) // but here I am also writing the schema, right?
encoder.flush
out.close
out.toByteArray
非常感谢
第一个代码确实使用模式注册表,并计算一个 ID + 在 KafkaAvroSerializer
如果要绕过Schema Registry,使用ByteArraySerializer
并将第二个代码块中out.toByteArray
的结果发送给生产者。