使用配置的 IAsyncSerializer 值序列化器调用 Produce,但使用 Avro Serializer 时需要 ISerializer
Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required when using Avro Serializer
我正在使用 Kafka 集群并使用 Transactional Producer 进行原子流式处理(读取-处理-写入)。
// Init Transactions
_transactionalProducer.InitTransactions(DefaultTimeout);
// Begin the transaction
_transactionalProducer.BeginTransaction();
// produce message to one or many topics
var topic = Topics.MyTopic;
_transactionalProducer.Produce(topic, consumeResult.Message);
我正在使用 AvroSerializer,因为我使用 Schema 发布消息。
Produce 抛出异常:
"System.InvalidOperationException: Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.\r\n at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler)"
我见过的所有事务性生产者示例都使用 Produce 方法而不是 ProduceAsync,因此不确定我是否可以简单地切换到 ProduceAsync 并假设事务性生产者将正常运行。如果我错了请纠正我或帮我找文档。
否则,我无法找到非异步的 AvroSerializer,继承自 ISerializer。
public class AvroSerializer<T> : IAsyncSerializer<T>
我没有意识到在创建 Serializer 时可以使用 AsSyncOverAsync
方法。这是因为 Kafka Consumer 仍然是 Sync 而不是 Async。
例如:
new AvroSerializer<TValue>(schemaRegistryClient, serializerConfig).AsSyncOverAsync();
这是该方法的 Confluent 文档。
//
// Summary:
// Create a sync serializer by wrapping an async one. For more information on the
// potential pitfalls in doing this, refer to Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.
public static ISerializer<T> AsSyncOverAsync<T>(this IAsyncSerializer<T> asyncSerializer);
我正在使用 Kafka 集群并使用 Transactional Producer 进行原子流式处理(读取-处理-写入)。
// Init Transactions
_transactionalProducer.InitTransactions(DefaultTimeout);
// Begin the transaction
_transactionalProducer.BeginTransaction();
// produce message to one or many topics
var topic = Topics.MyTopic;
_transactionalProducer.Produce(topic, consumeResult.Message);
我正在使用 AvroSerializer,因为我使用 Schema 发布消息。
Produce 抛出异常:
"System.InvalidOperationException: Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.\r\n at Confluent.Kafka.Producer`2.Produce(TopicPartition topicPartition, Message`2 message, Action`1 deliveryHandler)"
我见过的所有事务性生产者示例都使用 Produce 方法而不是 ProduceAsync,因此不确定我是否可以简单地切换到 ProduceAsync 并假设事务性生产者将正常运行。如果我错了请纠正我或帮我找文档。
否则,我无法找到非异步的 AvroSerializer,继承自 ISerializer。
public class AvroSerializer<T> : IAsyncSerializer<T>
我没有意识到在创建 Serializer 时可以使用 AsSyncOverAsync
方法。这是因为 Kafka Consumer 仍然是 Sync 而不是 Async。
例如:
new AvroSerializer<TValue>(schemaRegistryClient, serializerConfig).AsSyncOverAsync();
这是该方法的 Confluent 文档。
//
// Summary:
// Create a sync serializer by wrapping an async one. For more information on the
// potential pitfalls in doing this, refer to Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.
public static ISerializer<T> AsSyncOverAsync<T>(this IAsyncSerializer<T> asyncSerializer);