为什么要将元数据添加到此 Kafka 连接器的输出中?
Why is meta data added to the output of this Kafka connector?
我有一个 Kafka 连接器,其中包含以下用于 SourceTask 实现中的 poll()
方法的代码。
@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);
return records;
}
如果我将消费者附加到数据主题,我会收到从连接器发送的以下消息:
{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
如果我使用以下命令直接针对主题发布消息:
echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,
然后附加同一个消费者,我收到这条消息:
foo bar
这是我希望看到的连接器实现的输出,而不是我收到的 {"schema":...
消息。
如何更改 poll()
的实现,以便发送消息时架构元数据不会出现在消息的实际键和值中?
好吧,原来只是因为我在 connect-standalone.properties
中有以下几行
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
我应该有
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
作为替代解决方案,我还能够将以下设置从 true 更改为 false
value.converter.schemas.enable=false
然后在我的处理器中 class 我将代码更改为:
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
null, "bar")
};
这有所不同,因为我不再为该值指定架构。
我有一个 Kafka 连接器,其中包含以下用于 SourceTask 实现中的 poll()
方法的代码。
@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);
return records;
}
如果我将消费者附加到数据主题,我会收到从连接器发送的以下消息:
{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
如果我使用以下命令直接针对主题发布消息:
echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,
然后附加同一个消费者,我收到这条消息:
foo bar
这是我希望看到的连接器实现的输出,而不是我收到的 {"schema":...
消息。
如何更改 poll()
的实现,以便发送消息时架构元数据不会出现在消息的实际键和值中?
好吧,原来只是因为我在 connect-standalone.properties
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
我应该有
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
作为替代解决方案,我还能够将以下设置从 true 更改为 false
value.converter.schemas.enable=false
然后在我的处理器中 class 我将代码更改为:
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
null, "bar")
};
这有所不同,因为我不再为该值指定架构。