通过 Apache Kafka 发送的 Python-processed Avro 格式数据在 Apache Camel/Java 处理器中反序列化时不会产生相同的输出
Python-processed Avro formatted data sent through a Apache Kafka does not yield same output when dezerialized in Apache Camel/Java processor
我是 运行 Kafka 代理,我通过 Python 程序向其推送消息。为了高效的数据交换,我使用 Apache Avro 格式。在 Kafka 代理处,消息由带处理器的 Camel 路由获取。在这个处理器中,我想反序列化消息,最后想将数据推送到 InfluxDB。
流程机制有效,但在 Camel 路线中我没有得到我输入的数据。在 Python 方面
我创建了一个字典:
testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)
Python 端相应的 Avro 模式如下所示:
{
"namespace": "my.namespace",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "double_one", "type": "double"},
{"name": "double_two", "type": "double"},
{"name": "double_three", "type": "double"},
{"name": "time_stamp", "type": "long"}
]
}
将 avro 格式的消息发送到 Kafka 的 Python 代码如下所示:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: FurtureRecordMetadata
"""
schema = avro.schema.parse(schemaDefinition)
writer = avro.io.DatumWriter(schema)
bytes_stream = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_stream)
writer.write(dataDict, encoder)
raw_bytes = bytes_stream.getvalue()
messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
return result
消息按预期到达代理,由 camel 拾取并由以下 JAVA 代码处理:
from(kafkaEndpoint) //
.process(exchange -> {
Long kafkaInboundTime = Long
.parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
if (exchange.getIn().getHeader("kafka.KEY") != null) {
BinaryDecoder decoder = DecoderFactory.get()
.binaryDecoder(exchange.getIn().getBody(InputStream.class), null);
SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);
System.out.println(datumReader.read(null, decoder).toString());
}
}) //
.to(influxdbEndpoint);
avroSchema
目前在我的 class 的构造函数中硬编码如下:
avroSchema = SchemaBuilder.record("myRecord") //
.namespace("my.namespace") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
System.out.println
的输出是
{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}
显然,出了点问题,但我不知道是什么。任何帮助表示赞赏。
更新 1
由于 Python 代码在 Intel/Window 机器上是 运行,Kafka(在虚拟机中)和 Java 代码在 Linux 机器上具有未知架构,这可以吗影响是由系统的不同字节顺序引起的吗?
更新 1.1 字节顺序可以排除。两边都查了,都是'little'
更新 2
作为检查,我将所有字段的模式定义更改为字符串类型。使用此定义,值和键可以正确传输 - Python 输入和 Java/Camel 输出相同。
更新 3
到 Kafka 的 camel rout producer 端点没有反序列化器等任何特殊功能:
"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"
我找到了解决问题的方法。以下 Python 代码将所需的输出生成到 Kafka 中:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: None
"""
schema = avro.schema.parse(schemaDefinition)
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer = DatumWriter(schema)
writer.write(dataDict, encoder)
raw_bytes = bytes_writer.getvalue()
self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)
try:
# NOTE: I use the 'AVRO' key to separate avro formatted messages from others
result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
except Exception as err:
print(err)
self._messageBrokerWriterConnection.flush()
解决方案的关键是将 valueDeserializer=...
添加到 Apache Camel 端的端点定义中:
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
...
TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());
Apache camel 路由代码包括转换为 InfluxDB 点可以这样写:
@Component
public class Route_TEST_QUEUE extends RouteBuilder {
Schema avroSchema = null;
private Route_TEST_QUEUE() {
avroSchema = SchemaBuilder //
.record("ElectronCoolerCryoMessage") //
.namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
}
private String fromEndpoint = TEST_QUEUE.definitionString();
@Override
public void configure() throws Exception {
from(fromEndpoint) //
.process(messagePayload -> {
byte[] data = messagePayload.getIn().getBody(byte[].class);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
GenericRecord record = datumReader.read(null, decoder);
try {
Point.Builder influxPoint = Point
.measurement(record.get("name").toString());
long acqStamp = 0L;
if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
acqStamp = (long) record.get("time_stamp");
} else {
acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
}
influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);
Map<String, Object> fieldMap = new HashMap<>();
avroSchema.getFields().stream() //
.filter(field -> !field.name().equals("keyFieldname")) //
.forEach(field -> {
Object value = record.get(field.name());
fieldMap.put(field.name().toString(), value);
});
influxPoint.fields(fieldMap);
} catch (Exception e) {
MessageLogger.logError(e);
}
}) //
.to(...InfluxEndpoint...) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.to("stream:out");
}
}
}
这对我有用 - 没有融合,只有 kafka。
我是 运行 Kafka 代理,我通过 Python 程序向其推送消息。为了高效的数据交换,我使用 Apache Avro 格式。在 Kafka 代理处,消息由带处理器的 Camel 路由获取。在这个处理器中,我想反序列化消息,最后想将数据推送到 InfluxDB。
流程机制有效,但在 Camel 路线中我没有得到我输入的数据。在 Python 方面 我创建了一个字典:
testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)
Python 端相应的 Avro 模式如下所示:
{
"namespace": "my.namespace",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "double_one", "type": "double"},
{"name": "double_two", "type": "double"},
{"name": "double_three", "type": "double"},
{"name": "time_stamp", "type": "long"}
]
}
将 avro 格式的消息发送到 Kafka 的 Python 代码如下所示:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: FurtureRecordMetadata
"""
schema = avro.schema.parse(schemaDefinition)
writer = avro.io.DatumWriter(schema)
bytes_stream = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_stream)
writer.write(dataDict, encoder)
raw_bytes = bytes_stream.getvalue()
messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
return result
消息按预期到达代理,由 camel 拾取并由以下 JAVA 代码处理:
from(kafkaEndpoint) //
.process(exchange -> {
Long kafkaInboundTime = Long
.parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
if (exchange.getIn().getHeader("kafka.KEY") != null) {
BinaryDecoder decoder = DecoderFactory.get()
.binaryDecoder(exchange.getIn().getBody(InputStream.class), null);
SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);
System.out.println(datumReader.read(null, decoder).toString());
}
}) //
.to(influxdbEndpoint);
avroSchema
目前在我的 class 的构造函数中硬编码如下:
avroSchema = SchemaBuilder.record("myRecord") //
.namespace("my.namespace") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
System.out.println
的输出是
{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}
显然,出了点问题,但我不知道是什么。任何帮助表示赞赏。
更新 1 由于 Python 代码在 Intel/Window 机器上是 运行,Kafka(在虚拟机中)和 Java 代码在 Linux 机器上具有未知架构,这可以吗影响是由系统的不同字节顺序引起的吗?
更新 1.1 字节顺序可以排除。两边都查了,都是'little'
更新 2 作为检查,我将所有字段的模式定义更改为字符串类型。使用此定义,值和键可以正确传输 - Python 输入和 Java/Camel 输出相同。
更新 3 到 Kafka 的 camel rout producer 端点没有反序列化器等任何特殊功能:
"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"
我找到了解决问题的方法。以下 Python 代码将所需的输出生成到 Kafka 中:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: None
"""
schema = avro.schema.parse(schemaDefinition)
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer = DatumWriter(schema)
writer.write(dataDict, encoder)
raw_bytes = bytes_writer.getvalue()
self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)
try:
# NOTE: I use the 'AVRO' key to separate avro formatted messages from others
result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
except Exception as err:
print(err)
self._messageBrokerWriterConnection.flush()
解决方案的关键是将 valueDeserializer=...
添加到 Apache Camel 端的端点定义中:
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
...
TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());
Apache camel 路由代码包括转换为 InfluxDB 点可以这样写:
@Component
public class Route_TEST_QUEUE extends RouteBuilder {
Schema avroSchema = null;
private Route_TEST_QUEUE() {
avroSchema = SchemaBuilder //
.record("ElectronCoolerCryoMessage") //
.namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
}
private String fromEndpoint = TEST_QUEUE.definitionString();
@Override
public void configure() throws Exception {
from(fromEndpoint) //
.process(messagePayload -> {
byte[] data = messagePayload.getIn().getBody(byte[].class);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
GenericRecord record = datumReader.read(null, decoder);
try {
Point.Builder influxPoint = Point
.measurement(record.get("name").toString());
long acqStamp = 0L;
if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
acqStamp = (long) record.get("time_stamp");
} else {
acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
}
influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);
Map<String, Object> fieldMap = new HashMap<>();
avroSchema.getFields().stream() //
.filter(field -> !field.name().equals("keyFieldname")) //
.forEach(field -> {
Object value = record.get(field.name());
fieldMap.put(field.name().toString(), value);
});
influxPoint.fields(fieldMap);
} catch (Exception e) {
MessageLogger.logError(e);
}
}) //
.to(...InfluxEndpoint...) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.to("stream:out");
}
}
}
这对我有用 - 没有融合,只有 kafka。