KafkaStreams 如何实例化一个ConsumerRecordFactory?
KafkaStreams How to instantiate a ConsumerRecordFactory?
我正在尝试使用 Kafka Streams 提供的 ConsumerRecordFactory
主要遵循 confluent doc 来测试流应用程序,这是我目前的代码:
// Properties of the application
Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");
// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
我的问题是,当我编译代码时出现以下错误:
Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match
所以我知道 kafka 流定义了泛型方法 create(K,V,long)
并且当我使用非泛型类型创建我的工厂时,我创建了一个与第一个方法冲突的新方法。
我的问题是我应该如何实例化我的 ConsumerRecordFactory
?
我尝试使用 ConsumerRecordFactory<Object, Integer>
让我的工厂更通用,但是推断的类型不匹配。而且我找不到其他示例 confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory
, and 似乎使用与文档相同的代码。
(我知道这个问题更多的是 java 而不是 kafka 流,但我认为用 apache-kafka-streams
标记它是接触习惯 [=12= 的人的好方法])
以下代码中存在一些问题:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
- 您在
ConsumerRecordFactory
中将 valueType 定义为 Integer,但在 create()
方法中您传递的是 Long 输入值。
factory.create()
returns 一个 ConsumerRecord
而不是 ConsumerRecordFactory
.
关于方法的歧义,你是对的。所以要避免这个问题,请使用以下内容:
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte[], byte[]> record = factory.create("input-topic","key", 42);
我正在尝试使用 Kafka Streams 提供的 ConsumerRecordFactory
主要遵循 confluent doc 来测试流应用程序,这是我目前的代码:
// Properties of the application
Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");
// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
我的问题是,当我编译代码时出现以下错误:
Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match
所以我知道 kafka 流定义了泛型方法 create(K,V,long)
并且当我使用非泛型类型创建我的工厂时,我创建了一个与第一个方法冲突的新方法。
我的问题是我应该如何实例化我的 ConsumerRecordFactory
?
我尝试使用 ConsumerRecordFactory<Object, Integer>
让我的工厂更通用,但是推断的类型不匹配。而且我找不到其他示例 confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory
, and
(我知道这个问题更多的是 java 而不是 kafka 流,但我认为用 apache-kafka-streams
标记它是接触习惯 [=12= 的人的好方法])
以下代码中存在一些问题:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
- 您在
ConsumerRecordFactory
中将 valueType 定义为 Integer,但在create()
方法中您传递的是 Long 输入值。 factory.create()
returns 一个ConsumerRecord
而不是ConsumerRecordFactory
.
关于方法的歧义,你是对的。所以要避免这个问题,请使用以下内容:
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte[], byte[]> record = factory.create("input-topic","key", 42);