"SerializationException: Unknown magic byte" 工作 Producer/Consumer

"SerializationException: Unknown magic byte" with a working Producer/Consumer

我查了类似的问题,大部分都是序列化的问题。但就我而言,我有一对工作正常的生产者和消费者代码。我可以通过序列化和反序列化在 java 中发送和检索数据。

当我尝试使用此配置使用 JDBC 接收器连接器保存数据时:

curl -XPOST --header "Content-Type: application/json"  XXXXXXXXXX.XXXXXXXXXX.com:8083/connectors  -d 
'{  
    "name": "sink_AVROTESTNEW",  
    "config": {    
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    
        "tasks.max": 3,   
        "connection.url": "jdbc:oracle:thin:@XXXXXXXXXX:1854:XXXXXXXXXX",
        "connection.user": "XXXXXXXXXX",    
        "connection.password": "XXXXXXXXXX",    
        "table.name.format": "AVROTEST",
        "topics": "AVROTESTNEW",
        "auto.create": "false"
    }
}

它给出 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! 错误。我的生产者和消费者代码直接取自 confluent 网站。

制作人:

package com.test;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;


public class Producer {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, " XXXXXXXXXX:9072");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http:// XXXXXXXXXX.XXXXXXXX.XXXXXX:8071/");
        KafkaProducer producer = new KafkaProducer(props);

        String key = "key2";
        String userSchema = "{\n" +
                "  \"type\": \"record\",\n" +
                "  \"name\": \"CUSTOMER\",\n" +
               "  \"namespace\": \"XXXXXXXXX\",\n" +
                "  \"fields\": [\n" +
                "    {\n" +
                "      \"name\": \"FIRST_NAME\",\n" +
                "      \"type\": \"string\"\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"LAST_NAME\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"HEIGHT\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"WEIGHT\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"AUTOMATED_EMAIL\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    }\n" +
                "  ]\n" +
                "}\n";

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("FIRST_NAME", "X");
        avroRecord.put("LAST_NAME", "X");
        avroRecord.put("HEIGHT", "X");
        avroRecord.put("WEIGHT", "X");
        avroRecord.put("AUTOMATED_EMAIL", "X");


        ProducerRecord<Object, Object> record = new ProducerRecord<>("AVROTESTNEW", key, avroRecord);
        try {
            producer.send(record);
        } catch(SerializationException e) {
        }
        finally {
            producer.flush();
            producer.close();
        }
    }
}

消费者:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {


        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " XXXXXXXXXX:9072");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");


        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put("schema.registry.url", "http:// XXXXXXXXXX.XXXXXXX.XXXXX:8071/");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        String topic = "AVROTESTNEW";
        final org.apache.kafka.clients.consumer.Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

呃,这是一个愚蠢的错误。通过将 curl 命令更改为:

来解决它
curl -XPOST --header "Content-Type: application/json"  XXXXXXXXXXXXXXXXX:8083/connectors  -d 
'{  
    "name": "sink_AVROTESTBUNNEW",  
    "config": {    
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    
        "tasks.max": 3,   
        "connection.url": "jdbc:oracle:thin:@XXXXXXXX",
        "connection.user": "XXXXXXXXXXX",    
        "connection.password": "XXXXXXXXXX",    
        "table.name.format": "AVROTEST",
        "topics": "AVROTESTBUN",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://XXXXXXXXXXXXXXXXXX:8071",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable":"false",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "auto.create": true,
        "auto.evolve": true        
    }
}' 

定义键和值转换器解决了它。