通过 Apache Kafka 发送的 Python-processed Avro 格式数据在 Apache Camel/Java 处理器中反序列化时不会产生相同的输出

Python-processed Avro formatted data sent through a Apache Kafka does not yield same output when dezerialized in Apache Camel/Java processor

我是 运行 Kafka 代理,我通过 Python 程序向其推送消息。为了高效的数据交换,我使用 Apache Avro 格式。在 Kafka 代理处,消息由带处理器的 Camel 路由获取。在这个处理器中,我想反序列化消息,最后想将数据推送到 InfluxDB。

流程机制有效,但在 Camel 路线中我没有得到我输入的数据。在 Python 方面 我创建了一个字典:

testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)

Python 端相应的 Avro 模式如下所示:

{
  "namespace": "my.namespace",
  "name": "myRecord",
  "type": "record",
  "fields": [
    {"name": "name",         "type": "string"},
    {"name": "double_one",   "type": "double"},
    {"name": "double_two",   "type": "double"},
    {"name": "double_three", "type": "double"},
    {"name": "time_stamp",   "type": "long"}
  ]
}

将 avro 格式的消息发送到 Kafka 的 Python 代码如下所示:

def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
        -> FutureRecordMetadata:
    """
    Method for sending message to kafka broker in the avro binary format
    :param dataDict: data dictionary containing message data
    :param topic_id: the Kafka topic to send message to
    :param schemaDefinition: JSON schema definition
    :return: FurtureRecordMetadata
    """
    schema = avro.schema.parse(schemaDefinition)
    writer = avro.io.DatumWriter(schema)
    bytes_stream = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_stream)
    writer.write(dataDict, encoder)
    raw_bytes = bytes_stream.getvalue()

    messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
    
    result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
    return result

消息按预期到达代理,由 camel 拾取并由以下 JAVA 代码处理:

from(kafkaEndpoint) //
                .process(exchange -> {
                    Long kafkaInboundTime = Long
                            .parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
                    if (exchange.getIn().getHeader("kafka.KEY") != null) {

                        BinaryDecoder decoder = DecoderFactory.get()
                                .binaryDecoder(exchange.getIn().getBody(InputStream.class), null);

                        SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);

                        System.out.println(datumReader.read(null, decoder).toString());
                    }
                }) //
                .to(influxdbEndpoint);

avroSchema 目前在我的 class 的构造函数中硬编码如下:

avroSchema = SchemaBuilder.record("myRecord") //
                .namespace("my.namespace") //
                .fields() //
                .requiredString("name") //
                .requiredDouble("double_one") //
                .requiredDouble("double_two") //
                .requiredDouble("double_three") //
                .requiredLong("time_stamp") //  
                .endRecord();

System.out.println的输出是

{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}

显然,出了点问题,但我不知道是什么。任何帮助表示赞赏。

更新 1 由于 Python 代码在 Intel/Window 机器上是 运行,Kafka(在虚拟机中)和 Java 代码在 Linux 机器上具有未知架构,这可以吗影响是由系统的不同字节顺序引起的吗?

更新 1.1 字节顺序可以排除。两边都查了,都是'little'

更新 2 作为检查,我将所有字段的模式定义更改为字符串类型。使用此定义,值和键可以正确传输 - Python 输入和 Java/Camel 输出相同。

更新 3 到 Kafka 的 camel rout producer 端点没有反序列化器等任何特殊功能:

"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"

我找到了解决问题的方法。以下 Python 代码将所需的输出生成到 Kafka 中:

def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
        -> FutureRecordMetadata:
    """
    Method for sending message to kafka broker in the avro binary format
    :param dataDict: data dictionary containing message data
    :param topic_id: the Kafka topic to send message to
    :param schemaDefinition: JSON schema definition
    :return: None
    """
    schema = avro.schema.parse(schemaDefinition)

    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer = DatumWriter(schema)
    writer.write(dataDict, encoder)
    raw_bytes = bytes_writer.getvalue()

    self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)

    try:
        # NOTE: I use the 'AVRO' key to separate avro formatted messages from others 
        result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
    except Exception as err:
        print(err)
    self._messageBrokerWriterConnection.flush()

解决方案的关键是将 valueDeserializer=... 添加到 Apache Camel 端的端点定义中:

import org.apache.kafka.common.serialization.ByteArrayDeserializer;

 ...

TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());

Apache camel 路由代码包括转换为 InfluxDB 点可以这样写:

@Component
public class Route_TEST_QUEUE extends RouteBuilder {

   Schema avroSchema = null;

   private Route_TEST_QUEUE() {
       avroSchema = SchemaBuilder //
             .record("ElectronCoolerCryoMessage") //       
             .namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
            .fields() //
            .requiredString("name") //
            .requiredDouble("double_one") //
            .requiredDouble("double_two") //
            .requiredDouble("double_three") //
            .requiredLong("time_stamp") // 
            .endRecord();
    }

    private String fromEndpoint = TEST_QUEUE.definitionString();

    @Override
    public void configure() throws Exception {

        from(fromEndpoint) //
                .process(messagePayload -> {        
                    byte[] data = messagePayload.getIn().getBody(byte[].class);
                    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
                    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
                    GenericRecord record = datumReader.read(null, decoder);

                    try {
                        Point.Builder influxPoint = Point
                            .measurement(record.get("name").toString());
                        long acqStamp = 0L;
                        if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
                            acqStamp = (long) record.get("time_stamp");
                        } else {
                            acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
                        }

                        influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);

                        Map<String, Object> fieldMap = new HashMap<>();

                        avroSchema.getFields().stream() //
                                .filter(field ->    !field.name().equals("keyFieldname")) //
                                .forEach(field -> {
                                     Object value = record.get(field.name());
                                    fieldMap.put(field.name().toString(), value);
                                });

                        influxPoint.fields(fieldMap);

                    } catch (Exception e) {
                         MessageLogger.logError(e);
                    }
                }) //
                .to(...InfluxEndpoint...) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .to("stream:out");
        }
    }
}

这对我有用 - 没有融合,只有 kafka。