我的 Kafka 连接器生成的 Kafka 消息的每条消息开头的两个奇怪字节

Two strange bytes at the beginning of each message of a Kafka message produced by my Kafka Connector

我开发了一个 Kafka 连接器,它简单地为从外部 API 检索的文件中的每一行创建消息。它工作得很好,但现在我尝试使用这些消息,并且在每个值的开头都有两个奇怪的字节。我可以使用控制台消费者和我的 kafka 流处理器重现该问题。

�168410002,OpenX Market,459980962,OpenX_Bidder_Order_merkur_bidder_800x250,313115722,OpenX_Bidder_ANY_LI_merkur_800x250_550,106800839362,OpenX_Bidder_Creative_merkur_800x250_2,10

源文件很好,甚至在创建 SourceRecord 之前的 printlns 也不显示这两个字节。我之前使用了一个带有一个字段的结构,现在使用了一个简单的字符串模式,但我仍然遇到同样的问题:

def convert(line: String, ...) = {
...
val record = new SourceRecord(
  Partition.sole(partition),
  offset.forConnectApi,
  topic,
  Schema.STRING_SCHEMA,
  line
)
...

所以在上面的代码中,如果我添加 println(line) 则不会显示任何奇怪的字符。

您似乎在连接器中使用了 AvroConverter 或 JsonConverter。尝试在 worker 中的 key.converter 和 value.converter 中使用 Kafka 附带的 StringConverter 进行连接。这会将数据编码为不应包含这些额外内容的字符串。