我应该以什么二进制格式将 Kafka 主题摄取到 Druid,Druid 的事件是从 FlinkKafkaProducer[<Scala case class>] 发送的?
In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?
我有一个用 Scala 编写的数据管道需要改进。
- As-Is: Flink 1.8 -> (Tranquility, 正式支持 Druid 0.9.2) -> Druid 0.20.1
- 未来:Flink 1.11 -> Kafka -> Druid 0.20.1 使用推荐 Druid Kafka Indexing Service。
Flink应用生成Scala case的DataStreamclass(下面代码中的ExampleData)
根据Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can,Scala case class 属于“Flink-provided special serializers”,但我不确定它是如何序列化和因此,当 Druid 从 Kafka 主题读取它时,应该如何(即以什么格式)反序列化。
(除非有充分的理由,否则我不想向 Flink 应用程序添加依赖项,以降低维护成本)
所以我的问题是
- 我应该使用 the Druid doc 中提到的哪种(二进制)数据格式让 Druid 从 Kafka 主题中摄取事件?
- 或者如何在 Flink 端指定数据格式以便 Druid 可以读取(通过 Kafka)?
- 关于数据格式,Kafka端是否也需要设置?
感谢您提供的所有 help/info,感谢您的关注。
代码
Scala 案例class
case class ExampleData(timestamp: Long, id: Int, name: String, price: BigDecimal) extends BaseData
trait BaseData {
val timestamp: Long
val name: String
}
Flink -> Kafka
val props: ParameterTool = ...
...
KafkaSink.sendToKafka(exampleDataStream, props)
object KafkaSink {
def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
val topic: String = ...
val properties: Properties = ...
val producer = new FlinkKafkaProducer[ExampleData](
topic,
new ExampleDataSerializationSchema(topic),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
exampleDataStream.addSink(producer)
}
}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
class ExampleDataSerializationSchema(topic: String) extends KafkaSerializationSchema[ExampleData]{
val mapper = new ObjectMapper()
// https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-serializationschema
//
override def serialize(element: ExampleData, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val elementAsBytes: Array[Byte] =
try {
mapper.writeValueAsBytes(element)
} catch {
case e: JsonProcessingException => {
Array[Byte]()
}
}
new ProducerRecord[Array[Byte], Array[Byte]](topic, elementAsBytes)
}
}
环境
- Druid 0.20.1(如果绝对需要可以升级更多)
- Flink 1.11.2(由于某些原因应该是 1.11.x)
- Kafka 0.11.x(我们对版本控制不多)
参考资料
你的序列化器使用 Jackson/JSON,不管它是 Scala class,所以你应该使用 Driud 的 JSON one
"ioConfig": {
"topic": "your_topic_name",
"inputFormat": {
"type": "json"
},
FWIW,如果您将 Confluent Schema Registry 添加到您的环境中,则无需为每个 class 编写自己的 SerializationSchema,因为您可以使用 ConfluentRegistryAvroSerializationSchema
(Avro 格式)和 Druid似乎也支持 Schema Registry(至少对于 Avro,不确定 Protobuf 或 JSONSchema)。
我有一个用 Scala 编写的数据管道需要改进。
- As-Is: Flink 1.8 -> (Tranquility, 正式支持 Druid 0.9.2) -> Druid 0.20.1
- 未来:Flink 1.11 -> Kafka -> Druid 0.20.1 使用推荐 Druid Kafka Indexing Service。
Flink应用生成Scala case的DataStreamclass(下面代码中的ExampleData)
根据Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can,Scala case class 属于“Flink-provided special serializers”,但我不确定它是如何序列化和因此,当 Druid 从 Kafka 主题读取它时,应该如何(即以什么格式)反序列化。
(除非有充分的理由,否则我不想向 Flink 应用程序添加依赖项,以降低维护成本)
所以我的问题是
- 我应该使用 the Druid doc 中提到的哪种(二进制)数据格式让 Druid 从 Kafka 主题中摄取事件?
- 或者如何在 Flink 端指定数据格式以便 Druid 可以读取(通过 Kafka)?
- 关于数据格式,Kafka端是否也需要设置?
感谢您提供的所有 help/info,感谢您的关注。
代码
Scala 案例class
case class ExampleData(timestamp: Long, id: Int, name: String, price: BigDecimal) extends BaseData
trait BaseData {
val timestamp: Long
val name: String
}
Flink -> Kafka
val props: ParameterTool = ...
...
KafkaSink.sendToKafka(exampleDataStream, props)
object KafkaSink {
def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
val topic: String = ...
val properties: Properties = ...
val producer = new FlinkKafkaProducer[ExampleData](
topic,
new ExampleDataSerializationSchema(topic),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
exampleDataStream.addSink(producer)
}
}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
class ExampleDataSerializationSchema(topic: String) extends KafkaSerializationSchema[ExampleData]{
val mapper = new ObjectMapper()
// https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-serializationschema
//
override def serialize(element: ExampleData, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val elementAsBytes: Array[Byte] =
try {
mapper.writeValueAsBytes(element)
} catch {
case e: JsonProcessingException => {
Array[Byte]()
}
}
new ProducerRecord[Array[Byte], Array[Byte]](topic, elementAsBytes)
}
}
环境
- Druid 0.20.1(如果绝对需要可以升级更多)
- Flink 1.11.2(由于某些原因应该是 1.11.x)
- Kafka 0.11.x(我们对版本控制不多)
参考资料
你的序列化器使用 Jackson/JSON,不管它是 Scala class,所以你应该使用 Driud 的 JSON one
"ioConfig": {
"topic": "your_topic_name",
"inputFormat": {
"type": "json"
},
FWIW,如果您将 Confluent Schema Registry 添加到您的环境中,则无需为每个 class 编写自己的 SerializationSchema,因为您可以使用 ConfluentRegistryAvroSerializationSchema
(Avro 格式)和 Druid似乎也支持 Schema Registry(至少对于 Avro,不确定 Protobuf 或 JSONSchema)。