我应该以什么二进制格式将 Kafka 主题摄取到 Druid,Druid 的事件是从 FlinkKafkaProducer[<Scala case class>] 发送的?

In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?

我有一个用 Scala 编写的数据管道需要改进。

Flink应用生成Scala case的DataStreamclass(下面代码中的ExampleData)

根据Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can,Scala case class 属于“Flink-provided special serializers”,但我不确定它是如何序列化和因此,当 Druid 从 Kafka 主题读取它时,应该如何(即以什么格式)反序列化。

(除非有充分的理由,否则我不想向 Flink 应用程序添加依赖项,以降低维护成本)

所以我的问题是

感谢您提供的所有 help/info,感谢您的关注。

代码

Scala 案例class

case class ExampleData(timestamp: Long, id: Int, name: String, price: BigDecimal) extends BaseData

trait BaseData {
    val timestamp: Long
    val name: String
}

Flink -> Kafka

val props: ParameterTool = ...
...
KafkaSink.sendToKafka(exampleDataStream, props)
object KafkaSink {
  def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
    val topic: String = ...
    val properties: Properties = ... 

    val producer = new FlinkKafkaProducer[ExampleData](
      topic,
      new ExampleDataSerializationSchema(topic),
      properties,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

    exampleDataStream.addSink(producer)
  }
}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper

class ExampleDataSerializationSchema(topic: String) extends KafkaSerializationSchema[ExampleData]{
  val mapper = new ObjectMapper()

  // https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-serializationschema
  // 
  override def serialize(element: ExampleData, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    val elementAsBytes: Array[Byte] =
      try {
        mapper.writeValueAsBytes(element)
      } catch {
        case e: JsonProcessingException => {
          Array[Byte]()
        }
      }
    new ProducerRecord[Array[Byte], Array[Byte]](topic, elementAsBytes)
    }
}

环境

参考资料

你的序列化器使用 Jackson/JSON,不管它是 Scala class,所以你应该使用 Driud 的 JSON one

  "ioConfig": {
    "topic": "your_topic_name",
    "inputFormat": {
      "type": "json"
    },

FWIW,如果您将 Confluent Schema Registry 添加到您的环境中,则无需为每个 class 编写自己的 SerializationSchema,因为您可以使用 ConfluentRegistryAvroSerializationSchema(Avro 格式)和 Druid似乎也支持 Schema Registry(至少对于 Avro,不确定 Protobuf 或 JSONSchema)。