Kafka 融合代理 api - 发送消息 - 内部服务器错误

Kafka confluent proxy api - send message - Internal server error

我正在尝试将 Confluent kafka 代理 api 包装在一个 class 中,它将处理生产和消费。
按照这个 link: https://docs.confluent.io/platform/current/kafka-rest/api.html 我尝试按如下方式实现它:

    def send(self, topic, data):
        try:
            r = requests.post(self._url('/topics/' + topic), json=data, headers=headers_v2)
            if not r.ok:
                raise Exception("Error: ", r.reason)
        except Exception as e:
            print(" ")
            print('Event streams send request failed')
            print(Exception, e)
            print(" ")
            return e

    

但我最终使用了 2 个版本的 api (v2/v3),因为我在一个实现中没有找到一些 api,反之亦然...
比如我在v2中没有找到如何创建topic,所以我用v3实现了。

我现在的问题是 send 方法,我得到 Internal server error 但我找不到原因!
可能是因为创建主题是使用 v3 完成的,而我正在尝试使用 v2 生成消息。

我将发送的数据负载更改为:
data = {"records": [{"value": data}]} 发送通过,

投票通过时使用:
r = requests.get(self._url('/consumers/' + self.consumer_group + '/instances/' + self.consumer + '/records'), headers={'Accept': 'application/vnd.kafka.json.v2+json'})