如何在服务器上针对真实的 kafka 代理 运行 测试 kafka 消费者?
How to test a kafka consumer against a real kafka broker running on a server?
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想在服务器上针对真实的 Kafka 代理 运行ning 测试消费者,其中有一些生产者正在/已经将数据写入各种主题。我想与服务器建立连接,消费数据,并在测试中验证或处理其内容。
互联网上的绝大多数示例(实际上是我到目前为止看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并显示了在一台机器上本地实现的生产者和消费者。我还没有找到任何示例来解释如何与远程 kafka 服务器建立连接并从特定主题读取数据。
我写了一些代码,并打印了经纪人地址:
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
我得到的是127.0.0.1:9092,也就是说是本地的,所以还没有和远程服务器建立连接
另一方面,当我 运行 SpringBootApplication 时,我从远程代理获取有效负载。
接收者:
@Component
public class Receiver {
private static final String TOPIC_NAME = "X";
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
LOGGER.info("received the following payload: '{}'", payload);
latch.countDown();
}
}
配置:
@EnableKafka
@Configuration
public class ByteReceiverConfig {
@Autowired
EmbeddedKafkaBroker kafkaEmbeded;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupIdConfig;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
Map<String, Object> consumerProperties() {
final Map<String, Object> properties =
KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
return properties;
}
测试:
@EnableAutoConfiguration
@EnableKafka
@SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
@EmbeddedKafka
@ContextConfiguration(classes = ByteReceiverConfig.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
public class KafkaTest {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
Receiver receiver;
@BeforeEach
void waitForAssignment() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
System.out.println(messageListenerContainer.toString());
System.out.println(embeddedKafkaBroker.getTopics().size());
System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
System.out.println(embeddedKafkaBroker.getBrokersAsString());
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@Test
public void testReceive() {
}
}
我希望有人能阐明以下问题:
1.Can class EmbeddedKafkaBroker 的一个实例用于测试来自远程代理的数据,或者它仅用于本地测试,我会在其中生成,即将数据发送到我自己创建并使用数据的主题?
2.Is 可以为真正的 kafka 服务器编写测试 class 吗?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和 classes?
3.If我只想消费数据,是否必须在配置文件中提供生产者配置(这会很奇怪,但目前我遇到的所有例子都这样做了)?
4.Do 您知道任何显示使用 kafka 的真实示例的资源(书籍、网站等),即使用远程 kafka 服务器,仅使用生产者或消费者?
如果您只想与外部代理对话,则根本不需要嵌入式代理。
是的,只需适当设置 bootstrap 服务器 属性。
不,您不需要生产者配置。
编辑
@SpringBootApplication
public class So56044105Application {
public static void main(String[] args) {
SpringApplication.run(So56044105Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56044105", 1, (short) 1);
}
}
spring.kafka.bootstrap-servers=10.0.0.8:9092
spring.kafka.consumer.enable-auto-commit=false
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So56044105Application.class, So56044105ApplicationTests.Config.class })
public class So56044105ApplicationTests {
@Autowired
public Config config;
@Test
public void test() throws InterruptedException {
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(config.received.get(0)).isEqualTo("foo");
}
@Configuration
public static class Config implements ConsumerSeekAware {
List<String> received = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(3);
@KafkaListener(id = "so56044105", topics = "so56044105")
public void listen(String in) {
System.out.println(in);
this.received.add(in);
this.latch.countDown();
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
System.out.println("Seeking to beginning");
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
}
此 repository 中有一些示例用于跨各种配置引导真正的 Kafka 生产者和消费者——纯文本、SSL、有和没有身份验证等。
注意:上面的 repo 包含 Effective Kafka 一本书的示例,我是这本书的作者。然而,它们可以在没有这本书的情况下自由使用,希望它们自己也能发挥同样的作用。
更重要的是,这里有一对基本生产者和消费者的例子。
/** A sample Kafka producer. */
import static java.lang.System.*;
import java.util.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
public final class BasicProducerSample {
public static void main(String[] args) throws InterruptedException {
final var topic = "getting-started";
final Map<String, Object> config =
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (var producer = new KafkaProducer<String, String>(config)) {
while (true) {
final var key = "myKey";
final var value = new Date().toString();
out.format("Publishing record with value %s%n",
value);
final Callback callback = (metadata, exception) -> {
out.format("Published with metadata: %s, error: %s%n",
metadata, exception);
};
// publish the record, handling the metadata in the callback
producer.send(new ProducerRecord<>(topic, key, value), callback);
// wait a second before publishing another
Thread.sleep(1000);
}
}
}
}
/** A sample Kafka consumer. */
import static java.lang.System.*;
import java.time.*;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
public final class BasicConsumerSample {
public static void main(String[] args) {
final var topic = "getting-started";
final Map<String, Object> config =
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer-sample",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (var consumer = new KafkaConsumer<String, String>(config)) {
consumer.subscribe(Set.of(topic));
while (true) {
final var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
out.format("Got record with value %s%n", record.value());
}
consumer.commitAsync();
}
}
}
}
现在,这些显然不是单元测试。但是只需很少的返工,它们就可以合而为一。下一步是删除 Thread.sleep()
并添加断言。请注意,由于 Kafka 本质上是异步的,因此在发布后立即在消费者中天真地断言已发布的消息将失败。对于可靠的、可重复的测试,您可能需要使用类似 Timesert.
的东西
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想在服务器上针对真实的 Kafka 代理 运行ning 测试消费者,其中有一些生产者正在/已经将数据写入各种主题。我想与服务器建立连接,消费数据,并在测试中验证或处理其内容。
互联网上的绝大多数示例(实际上是我到目前为止看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并显示了在一台机器上本地实现的生产者和消费者。我还没有找到任何示例来解释如何与远程 kafka 服务器建立连接并从特定主题读取数据。 我写了一些代码,并打印了经纪人地址:
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
我得到的是127.0.0.1:9092,也就是说是本地的,所以还没有和远程服务器建立连接
另一方面,当我 运行 SpringBootApplication 时,我从远程代理获取有效负载。
接收者:
@Component
public class Receiver {
private static final String TOPIC_NAME = "X";
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
LOGGER.info("received the following payload: '{}'", payload);
latch.countDown();
}
}
配置:
@EnableKafka
@Configuration
public class ByteReceiverConfig {
@Autowired
EmbeddedKafkaBroker kafkaEmbeded;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupIdConfig;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
Map<String, Object> consumerProperties() {
final Map<String, Object> properties =
KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
return properties;
}
测试:
@EnableAutoConfiguration
@EnableKafka
@SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
@EmbeddedKafka
@ContextConfiguration(classes = ByteReceiverConfig.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
public class KafkaTest {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
Receiver receiver;
@BeforeEach
void waitForAssignment() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
System.out.println(messageListenerContainer.toString());
System.out.println(embeddedKafkaBroker.getTopics().size());
System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
System.out.println(embeddedKafkaBroker.getBrokersAsString());
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@Test
public void testReceive() {
}
}
我希望有人能阐明以下问题:
1.Can class EmbeddedKafkaBroker 的一个实例用于测试来自远程代理的数据,或者它仅用于本地测试,我会在其中生成,即将数据发送到我自己创建并使用数据的主题?
2.Is 可以为真正的 kafka 服务器编写测试 class 吗?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和 classes?
3.If我只想消费数据,是否必须在配置文件中提供生产者配置(这会很奇怪,但目前我遇到的所有例子都这样做了)?
4.Do 您知道任何显示使用 kafka 的真实示例的资源(书籍、网站等),即使用远程 kafka 服务器,仅使用生产者或消费者?
如果您只想与外部代理对话,则根本不需要嵌入式代理。
是的,只需适当设置 bootstrap 服务器 属性。
不,您不需要生产者配置。
编辑
@SpringBootApplication
public class So56044105Application {
public static void main(String[] args) {
SpringApplication.run(So56044105Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56044105", 1, (short) 1);
}
}
spring.kafka.bootstrap-servers=10.0.0.8:9092
spring.kafka.consumer.enable-auto-commit=false
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So56044105Application.class, So56044105ApplicationTests.Config.class })
public class So56044105ApplicationTests {
@Autowired
public Config config;
@Test
public void test() throws InterruptedException {
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(config.received.get(0)).isEqualTo("foo");
}
@Configuration
public static class Config implements ConsumerSeekAware {
List<String> received = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(3);
@KafkaListener(id = "so56044105", topics = "so56044105")
public void listen(String in) {
System.out.println(in);
this.received.add(in);
this.latch.countDown();
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
System.out.println("Seeking to beginning");
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
}
此 repository 中有一些示例用于跨各种配置引导真正的 Kafka 生产者和消费者——纯文本、SSL、有和没有身份验证等。
注意:上面的 repo 包含 Effective Kafka 一本书的示例,我是这本书的作者。然而,它们可以在没有这本书的情况下自由使用,希望它们自己也能发挥同样的作用。
更重要的是,这里有一对基本生产者和消费者的例子。
/** A sample Kafka producer. */
import static java.lang.System.*;
import java.util.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
public final class BasicProducerSample {
public static void main(String[] args) throws InterruptedException {
final var topic = "getting-started";
final Map<String, Object> config =
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (var producer = new KafkaProducer<String, String>(config)) {
while (true) {
final var key = "myKey";
final var value = new Date().toString();
out.format("Publishing record with value %s%n",
value);
final Callback callback = (metadata, exception) -> {
out.format("Published with metadata: %s, error: %s%n",
metadata, exception);
};
// publish the record, handling the metadata in the callback
producer.send(new ProducerRecord<>(topic, key, value), callback);
// wait a second before publishing another
Thread.sleep(1000);
}
}
}
}
/** A sample Kafka consumer. */
import static java.lang.System.*;
import java.time.*;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
public final class BasicConsumerSample {
public static void main(String[] args) {
final var topic = "getting-started";
final Map<String, Object> config =
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer-sample",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (var consumer = new KafkaConsumer<String, String>(config)) {
consumer.subscribe(Set.of(topic));
while (true) {
final var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
out.format("Got record with value %s%n", record.value());
}
consumer.commitAsync();
}
}
}
}
现在,这些显然不是单元测试。但是只需很少的返工,它们就可以合而为一。下一步是删除 Thread.sleep()
并添加断言。请注意,由于 Kafka 本质上是异步的,因此在发布后立即在消费者中天真地断言已发布的消息将失败。对于可靠的、可重复的测试,您可能需要使用类似 Timesert.