confluent-kafka-python json_producer :无法识别的字段:schemaType

confluent-kafka-python json_producer : Unrecognized field: schemaType

当我在 Confluent Github 存储库

中执行 json_producer.py 示例时遇到此错误 "Unrecognized field: schemaType (HTTP status code 422, SR code 422)"

这是我的docker-撰写:

version: '3.3'
services:
    postgres:
        container_name: postgres
        ports:
            - '5432:5432'
        environment:
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - POSTGRES_DB=shipment_db
            - PGPASSWORD=password
        image: 'debezium/postgres:13'

    zookeeper:
        container_name: zookeeper
        ports:
            - '2181:2181'
            - '2888:2888'
            - '3888:3888'
        image: 'debezium/zookeeper:1.7'

    kafka:
        container_name: kafka
        ports:
            - '9092:9092'
        links:
            - 'zookeeper:zookeeper'
        image: 'debezium/kafka:1.7'
        environment:
            ZOOKEEPER_CONNECT: zookeeper:2181
        #volumes:
        #    - ./kafka:/kafka/config:rw

    connect:
        image: debezium/connect:1.7
        hostname: connect
        container_name: connect
        ports:
            - 8083:8083
        environment:
            BOOTSTRAP_SERVERS: kafka:9092
            GROUP_ID: 1
            CONFIG_STORAGE_TOPIC: my_connect_configs
            OFFSET_STORAGE_TOPIC: my_connect_offsets
            STATUS_STORAGE_TOPIC: my_connect_statuses
            CONNECT_BOOTSTRAP_SERVERS: kafka:9092
            CONNECT_GROUP_ID: connect-cluster-A
            CONNECT_PLUGIN_PATH: /kafka/data, /kafka/connect
            #EXTERNAL_LIBS_DIR: /kafka/external_libs,/kafka/data
            CLASSPATH: /kafka/data/*
            KAFKA_CONNECT_PLUGINS_DIR: /kafka/data, /kafka/connect
            #CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect=DEBUG,org.apache.plc4x.kafka.Plc4xSinkConnector=DEBUG"

        volumes:
            - type: bind
              source: ./plugins
              target: /kafka/data
        depends_on:
            - zookeeper
            - kafka
            - postgres
        links:
            - zookeeper
            - kafka
            - postgres

    schema-registry:
        image: confluentinc/cp-schema-registry:5.4.6
        hostname: schema-registry
        container_name: schema-registry
        depends_on:
            - zookeeper
            - kafka
        ports:
            - "8081:8081"
        environment:
            SCHEMA_REGISTRY_HOST_NAME: schema-registry
            SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
            SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"

    ksqldb-server:
        image: confluentinc/ksqldb-server:0.23.1
        hostname: ksqldb-server
        container_name: ksqldb-server
        depends_on:
            - kafka
            - zookeeper
            - schema-registry
        ports:
            - "8088:8088"
        volumes:
            - "./confluent-hub-components/:/usr/share/kafka/plugins/"
        environment:
            KSQL_LISTENERS: "http://0.0.0.0:8088"
            KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
            KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
            KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
            KSQL_KSQL_CONNECT_URL: http://connect:8083


json_producer 的代码在存储库中 here

执行此命令时出现的错误:

$ python3 json_producer.py -b 0.0.0.0:9092 -s http://0.0.0.0:8081 -t test

堆栈跟踪如下:

raceback (most recent call last):
  File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
    value = self._value_serializer(value, ctx)
  File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/json_schema.py", line 190, in __call__
    self._schema_id = self._registry.register_schema(subject,
  File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 336, in register_schema
    response = self._rest_client.post(
  File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post
    return self.send_request(url, method='POST', body=body)
  File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 174, in send_request
    raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: Unrecognized field: schemaType (HTTP status code 422, SR code 422)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "json_producer.py", line 172, in <module>
    main(parser.parse_args())
  File "json_producer.py", line 151, in main
    producer.produce(topic=topic, key=str(uuid4()), value=user,
  File "/home/alessio/fm_v2/python/confluent-kafka-python-master/examples/venv_examples/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
    raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Unrecognized field: schemaType (HTTP status code 422, SR code 422)"}

问题出在哪里?感谢您的回复。

Jsonschema 支持直到版本 6.0 才添加到 Confluent Schema Registry,这就是为什么错误报告有关 schemaType 字段的问题,因为任何较低版本的注册表 response/request 有效负载都可以不知道那个领域。

至少升级到该版本,或使用最新版本的图像将解决该错误

如果您只想生成 JSON,则不需要注册表。更多详情请见 https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/ .
您可以使用常规 producer.py 示例并在 CLI

上提供 JSON 对象 作为字符串