confluent-kafka-python json_producer :无法识别的字段:schemaType
confluent-kafka-python json_producer : Unrecognized field: schemaType
当我在 Confluent Github 存储库
中执行 json_producer.py 示例时遇到此错误 "Unrecognized field: schemaType (HTTP status code 422, SR code 422)"
这是我的docker-撰写:
version: '3.3'
services:
postgres:
container_name: postgres
ports:
- '5432:5432'
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=shipment_db
- PGPASSWORD=password
image: 'debezium/postgres:13'
zookeeper:
container_name: zookeeper
ports:
- '2181:2181'
- '2888:2888'
- '3888:3888'
image: 'debezium/zookeeper:1.7'
kafka:
container_name: kafka
ports:
- '9092:9092'
links:
- 'zookeeper:zookeeper'
image: 'debezium/kafka:1.7'
environment:
ZOOKEEPER_CONNECT: zookeeper:2181
#volumes:
# - ./kafka:/kafka/config:rw
connect:
image: debezium/connect:1.7
hostname: connect
container_name: connect
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_GROUP_ID: connect-cluster-A
CONNECT_PLUGIN_PATH: /kafka/data, /kafka/connect
#EXTERNAL_LIBS_DIR: /kafka/external_libs,/kafka/data
CLASSPATH: /kafka/data/*
KAFKA_CONNECT_PLUGINS_DIR: /kafka/data, /kafka/connect
#CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect=DEBUG,org.apache.plc4x.kafka.Plc4xSinkConnector=DEBUG"
volumes:
- type: bind
source: ./plugins
target: /kafka/data
depends_on:
- zookeeper
- kafka
- postgres
links:
- zookeeper
- kafka
- postgres
schema-registry:
image: confluentinc/cp-schema-registry:5.4.6
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
- zookeeper
- schema-registry
ports:
- "8088:8088"
volumes:
- "./confluent-hub-components/:/usr/share/kafka/plugins/"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://connect:8083
json_producer 的代码在存储库中 here。
执行此命令时出现的错误:
$ python3 json_producer.py -b 0.0.0.0:9092 -s http://0.0.0.0:8081 -t test
堆栈跟踪如下:
raceback (most recent call last):
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
value = self._value_serializer(value, ctx)
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/json_schema.py", line 190, in __call__
self._schema_id = self._registry.register_schema(subject,
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 336, in register_schema
response = self._rest_client.post(
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post
return self.send_request(url, method='POST', body=body)
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 174, in send_request
raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: Unrecognized field: schemaType (HTTP status code 422, SR code 422)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "json_producer.py", line 172, in <module>
main(parser.parse_args())
File "json_producer.py", line 151, in main
producer.produce(topic=topic, key=str(uuid4()), value=user,
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Unrecognized field: schemaType (HTTP status code 422, SR code 422)"}
问题出在哪里?感谢您的回复。
Jsonschema 支持直到版本 6.0 才添加到 Confluent Schema Registry,这就是为什么错误报告有关 schemaType
字段的问题,因为任何较低版本的注册表 response/request 有效负载都可以不知道那个领域。
至少升级到该版本,或使用最新版本的图像将解决该错误
如果您只想生成 JSON,则不需要注册表。更多详情请见 https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/ .
您可以使用常规 producer.py
示例并在 CLI
上提供 JSON 对象 作为字符串
当我在 Confluent Github 存储库
中执行 json_producer.py 示例时遇到此错误"Unrecognized field: schemaType (HTTP status code 422, SR code 422)"
这是我的docker-撰写:
version: '3.3'
services:
postgres:
container_name: postgres
ports:
- '5432:5432'
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=shipment_db
- PGPASSWORD=password
image: 'debezium/postgres:13'
zookeeper:
container_name: zookeeper
ports:
- '2181:2181'
- '2888:2888'
- '3888:3888'
image: 'debezium/zookeeper:1.7'
kafka:
container_name: kafka
ports:
- '9092:9092'
links:
- 'zookeeper:zookeeper'
image: 'debezium/kafka:1.7'
environment:
ZOOKEEPER_CONNECT: zookeeper:2181
#volumes:
# - ./kafka:/kafka/config:rw
connect:
image: debezium/connect:1.7
hostname: connect
container_name: connect
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_GROUP_ID: connect-cluster-A
CONNECT_PLUGIN_PATH: /kafka/data, /kafka/connect
#EXTERNAL_LIBS_DIR: /kafka/external_libs,/kafka/data
CLASSPATH: /kafka/data/*
KAFKA_CONNECT_PLUGINS_DIR: /kafka/data, /kafka/connect
#CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect=DEBUG,org.apache.plc4x.kafka.Plc4xSinkConnector=DEBUG"
volumes:
- type: bind
source: ./plugins
target: /kafka/data
depends_on:
- zookeeper
- kafka
- postgres
links:
- zookeeper
- kafka
- postgres
schema-registry:
image: confluentinc/cp-schema-registry:5.4.6
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
- zookeeper
- schema-registry
ports:
- "8088:8088"
volumes:
- "./confluent-hub-components/:/usr/share/kafka/plugins/"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://connect:8083
json_producer 的代码在存储库中 here。
执行此命令时出现的错误:
$ python3 json_producer.py -b 0.0.0.0:9092 -s http://0.0.0.0:8081 -t test
堆栈跟踪如下:
raceback (most recent call last):
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
value = self._value_serializer(value, ctx)
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/json_schema.py", line 190, in __call__
self._schema_id = self._registry.register_schema(subject,
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 336, in register_schema
response = self._rest_client.post(
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post
return self.send_request(url, method='POST', body=body)
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 174, in send_request
raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: Unrecognized field: schemaType (HTTP status code 422, SR code 422)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "json_producer.py", line 172, in <module>
main(parser.parse_args())
File "json_producer.py", line 151, in main
producer.produce(topic=topic, key=str(uuid4()), value=user,
File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Unrecognized field: schemaType (HTTP status code 422, SR code 422)"}
问题出在哪里?感谢您的回复。
Jsonschema 支持直到版本 6.0 才添加到 Confluent Schema Registry,这就是为什么错误报告有关 schemaType
字段的问题,因为任何较低版本的注册表 response/request 有效负载都可以不知道那个领域。
至少升级到该版本,或使用最新版本的图像将解决该错误
如果您只想生成 JSON,则不需要注册表。更多详情请见 https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/ .
您可以使用常规 producer.py
示例并在 CLI