在 SchemaRegistryClient Confluent Kafka 中禁用证书验证
Disable Certificate validation in SchemaRegistryClient Confluent Kafka
所以,我想阅读来自 kafka (Confluent) 的主题,其中数据采用 Avro 格式。
出于某些不可避免的原因,我想禁用证书验证。
我正在使用 security.protocol= SASL_SSL 和 sasl.mechanisms= OAUTHBEARER
我可以通过禁用 ssl 证书验证来连接到 Kafka
'enable.ssl.certificate.verification': 'false'
现在,我 运行 在尝试使用架构注册表反序列化值时遇到了问题。
avro 反序列化器需要模式注册表客户端和模式(可选)。我都通过了。我通过使用 verify=False 发出单独的请求来获取模式,从而获得了第二个参数的值,这工作正常。但是当我尝试从中创建反序列化消费者时出现问题。
基本上是代码的框架(以及问题所在的注释)
topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')
schema_registry_response = requests.get(url, verify=False) #here I intentionally used verify=False to get the schema
schema_registry_response.raise_for_status()
consumption_schema=schema_registry_response.json()['schema']
print(consumption_schema) # This works fine
schema_registry_client = SchemaRegistryClient({'url': registry_configuration})
avro_deserializer = AvroDeserializer(schema_registry_client,consumption_schema
) # This is the problem area which is called when the poll method is called
string_deserializer = StringDeserializer('utf_8')
basic_conf=_get_basic_configuration()
consumer_conf = {'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': 'myconsumergroupid',
'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)
# update default config with parameter
additional_consumer_conf={}
consumer_conf.update(additional_consumer_conf)
cn=DeserializingConsumer(consumer_conf)
cn.subscribe(['topicname'])
while True:
msg=cn.poll(10) # This fails because the deserializingconsumer calls the schema registry and the certificate validation fails
你上面看到的get_basic_configuration方法有
'enable.ssl.certificate.verification': 'false'
错误是
Max retries exceeded with url: /schemas/ids/140 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain
我已经查看了 SchemaRegistryClient 的代码 here 但我没有看到任何将证书验证设置为 false 的选项。
我还搜索了 S.O 个帖子和其他 Confluent 文档,看看是否能找到一些东西,但没有任何帮助
希望有人在这里了解更多?我愿意详细说明或澄清任何问题吗?
如果可能的话,我尽量不使用大量自定义逻辑来反序列化。
我找到了答案。
基本上就是这个S.Opost。特别是接受答案之后的答案,如果你使用的是 Confluent kafka
和 avro 文档 here ,因为我的模式不是来自文件,而是作为 http 响应,所以我必须使用 avro.schema.parse
来解析它
最终框架代码
topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')
schema_registry_response = requests.get(url, verify=False)
schema_registry_response.raise_for_status()
consumption_schema=schema_registry_response.json()['schema']
consumption_schema = avro.schema.parse(consumption_schema)
schema_registry_client = SchemaRegistryClient({'url': registry_configuration})
basic_conf=_get_basic_configuration()
consumer_conf = {
'group.id': 'myconsumergroupid',
'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)
cn=Consumer(consumer_conf)
cn.subscribe(['mytopic'])
reader = DatumReader(consumption_schema)
while True:
msg=cn.poll(10)
if msg is None:
break
m=msg.value()
message_bytes = io.BytesIO(m)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
print(event_dict)
所以,我想阅读来自 kafka (Confluent) 的主题,其中数据采用 Avro 格式。
出于某些不可避免的原因,我想禁用证书验证。
我正在使用 security.protocol= SASL_SSL 和 sasl.mechanisms= OAUTHBEARER
我可以通过禁用 ssl 证书验证来连接到 Kafka
'enable.ssl.certificate.verification': 'false'
现在,我 运行 在尝试使用架构注册表反序列化值时遇到了问题。 avro 反序列化器需要模式注册表客户端和模式(可选)。我都通过了。我通过使用 verify=False 发出单独的请求来获取模式,从而获得了第二个参数的值,这工作正常。但是当我尝试从中创建反序列化消费者时出现问题。
基本上是代码的框架(以及问题所在的注释)
topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')
schema_registry_response = requests.get(url, verify=False) #here I intentionally used verify=False to get the schema
schema_registry_response.raise_for_status()
consumption_schema=schema_registry_response.json()['schema']
print(consumption_schema) # This works fine
schema_registry_client = SchemaRegistryClient({'url': registry_configuration})
avro_deserializer = AvroDeserializer(schema_registry_client,consumption_schema
) # This is the problem area which is called when the poll method is called
string_deserializer = StringDeserializer('utf_8')
basic_conf=_get_basic_configuration()
consumer_conf = {'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': 'myconsumergroupid',
'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)
# update default config with parameter
additional_consumer_conf={}
consumer_conf.update(additional_consumer_conf)
cn=DeserializingConsumer(consumer_conf)
cn.subscribe(['topicname'])
while True:
msg=cn.poll(10) # This fails because the deserializingconsumer calls the schema registry and the certificate validation fails
你上面看到的get_basic_configuration方法有
'enable.ssl.certificate.verification': 'false'
错误是
Max retries exceeded with url: /schemas/ids/140 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain
我已经查看了 SchemaRegistryClient 的代码 here 但我没有看到任何将证书验证设置为 false 的选项。
我还搜索了 S.O 个帖子和其他 Confluent 文档,看看是否能找到一些东西,但没有任何帮助
希望有人在这里了解更多?我愿意详细说明或澄清任何问题吗? 如果可能的话,我尽量不使用大量自定义逻辑来反序列化。
我找到了答案。
基本上就是这个S.Opost
和 avro 文档 here ,因为我的模式不是来自文件,而是作为 http 响应,所以我必须使用 avro.schema.parse
来解析它最终框架代码
topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')
schema_registry_response = requests.get(url, verify=False)
schema_registry_response.raise_for_status()
consumption_schema=schema_registry_response.json()['schema']
consumption_schema = avro.schema.parse(consumption_schema)
schema_registry_client = SchemaRegistryClient({'url': registry_configuration})
basic_conf=_get_basic_configuration()
consumer_conf = {
'group.id': 'myconsumergroupid',
'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)
cn=Consumer(consumer_conf)
cn.subscribe(['mytopic'])
reader = DatumReader(consumption_schema)
while True:
msg=cn.poll(10)
if msg is None:
break
m=msg.value()
message_bytes = io.BytesIO(m)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
print(event_dict)