使用 php-enqueue 生成 avro 消息

Producing avro message using php-enqueue

我正在研究一种使用 php-enqueue.

生成从 php 到 kafka 的 avro 消息的方法

他们的 documentation 声明您可以使用其他格式,包括 Apache Avro。

By default the transport serializes messages to json format but you might want to use another format such as Apache Avro. For that you have to implement Serializer interface and set it to the context, producer or consumer. If a serializer set to context it will be injected to all consumers and producers created by the context.

<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
    public function toMessage($string) {}

    public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$context->setSerializer(new FooSerializer());

示例中的序列化程序正在与字符串相互转换。据我所知,Avro 格式是二进制的,那么自定义序列化程序在这种情况下应该如何工作?

Php 字符串可以包含二进制数据。这是使用已在模式注册表中注册的模式 ID 生成 avro 消息的部分实现。 avro 的序列化是使用 jaumo/avro 实现完成的。

public function toString(RdKafkaMessage $message): string
{
    ...

    $message = json_decode($message->getBody(), true);

    $encodedHeader = $this->createAvroHeader($schemaId);
    $encodedMessage = Serde::encodeMessage($parsedSchema, $message);

    return $encodedHeader . $encodedMessage;
}

private function createAvroHeader(int $schemaId): string
{
    $binarySchemaId = hex2bin(sprintf("%08s", dechex($schemaId)));
    return pack("C", 0) . $binarySchemaId;
}