Kafka 融合代理 api - 发送消息 - 内部服务器错误
Kafka confluent proxy api - send message - Internal server error
我正在尝试将 Confluent kafka 代理 api 包装在一个 class 中,它将处理生产和消费。
按照这个 link: https://docs.confluent.io/platform/current/kafka-rest/api.html 我尝试按如下方式实现它:
def send(self, topic, data):
try:
r = requests.post(self._url('/topics/' + topic), json=data, headers=headers_v2)
if not r.ok:
raise Exception("Error: ", r.reason)
except Exception as e:
print(" ")
print('Event streams send request failed')
print(Exception, e)
print(" ")
return e
但我最终使用了 2 个版本的 api (v2/v3),因为我在一个实现中没有找到一些 api,反之亦然...
比如我在v2中没有找到如何创建topic,所以我用v3实现了。
我现在的问题是 send
方法,我得到 Internal server error
但我找不到原因!
可能是因为创建主题是使用 v3 完成的,而我正在尝试使用 v2 生成消息。
我将发送的数据负载更改为:
data = {"records": [{"value": data}]}
发送通过,
投票通过时使用:
r = requests.get(self._url('/consumers/' + self.consumer_group + '/instances/' + self.consumer + '/records'), headers={'Accept': 'application/vnd.kafka.json.v2+json'})
我正在尝试将 Confluent kafka 代理 api 包装在一个 class 中,它将处理生产和消费。
按照这个 link: https://docs.confluent.io/platform/current/kafka-rest/api.html 我尝试按如下方式实现它:
def send(self, topic, data):
try:
r = requests.post(self._url('/topics/' + topic), json=data, headers=headers_v2)
if not r.ok:
raise Exception("Error: ", r.reason)
except Exception as e:
print(" ")
print('Event streams send request failed')
print(Exception, e)
print(" ")
return e
但我最终使用了 2 个版本的 api (v2/v3),因为我在一个实现中没有找到一些 api,反之亦然...
比如我在v2中没有找到如何创建topic,所以我用v3实现了。
我现在的问题是 send
方法,我得到 Internal server error
但我找不到原因!
可能是因为创建主题是使用 v3 完成的,而我正在尝试使用 v2 生成消息。
我将发送的数据负载更改为:
data = {"records": [{"value": data}]}
发送通过,
投票通过时使用:
r = requests.get(self._url('/consumers/' + self.consumer_group + '/instances/' + self.consumer + '/records'), headers={'Accept': 'application/vnd.kafka.json.v2+json'})