在没有注释且没有 Spring 引导的情况下创建 KafkaListener
Creating KafkaListener without Annotation & without Spring Boot
我试图在不使用 @KafkaListener 注释的情况下为主题创建 Kafka 消费者。我想这样做是因为我试图在不使用 spring 引导的情况下基于 application.properties 动态创建侦听器。
我认为最好的方法是手动创建一个 KafkaListenerContainerFactory 有人可以提供一个示例来说明如何在它自己的 class.
- 和spring
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new MyMessageListener());
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.setAutoStartup(false);
// bean name is the prefix of kafka consumer thread name
listenerContainer.setBeanName("kafka-message-listener");
return listenerContainer;
}
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
return props;
}
static class MyMessageListener implements MessageListener<String, String>
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// do something
}
}
- 没有spring
kafka documentation 很有帮助。下面是它的一个用法示例。
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
我也有同样的需求。我不想使用低级消费者并自己调用轮询。我想使用与@KafkaListener 相同的逻辑,只是动态配置它,特别是根据配置创建多个 Kafka 侦听器。
下面的解决方案是我正在寻找的:
http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/
这将在后台启动一个消费者线程。
它还会每 10 秒向 test-topic
.
发送一条消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class PeriodicProducerConsumer implements Runnable {
KafkaTemplate<String, Object> kafkaTemplate;
ScheduledExecutorService service;
PeriodicProducerConsumer() {
// Producer Declaration
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(configs);
this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
// Consumer Declaration
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "airbus-service-ka-consumer-group");
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
String topic = "test-topic";
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new AirbusServiceKaMessageListener());
ConcurrentMessageListenerContainer<String, String> container =
new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
container.start();
}
public void start() {
service = Executors.newSingleThreadScheduledExecutor();
service.scheduleWithFixedDelay(this, 5, 10, TimeUnit.SECONDS);
}
public void stop() {
service.shutdown();
}
@Override
public void run() {
String data = String.format("New Airbus Hello at %s", new Date());
kafkaTemplate.send("test-topic", data);
}
}
这里是Consumed消息处理函数:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
public class AirbusServiceKaMessageListener implements MessageListener<String, Object> {
private static final Logger LOG = LoggerFactory.getLogger(AirbusServiceKaMessageListener.class);
@Override
public void onMessage(ConsumerRecord<String, Object> data) {
LOG.info("########################## New Consuming Message From Message Listener ##########################");
LOG.info("Message # {}", data.value());
LOG.info("#################################################################################################");
}
}
Bean.xml
<bean name="purekafkaProduceConsume" class="com.myntra.airbus.purekafka.PeriodicProducerConsumer" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
我试图在不使用 @KafkaListener 注释的情况下为主题创建 Kafka 消费者。我想这样做是因为我试图在不使用 spring 引导的情况下基于 application.properties 动态创建侦听器。
我认为最好的方法是手动创建一个 KafkaListenerContainerFactory 有人可以提供一个示例来说明如何在它自己的 class.
- 和spring
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new MyMessageListener());
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.setAutoStartup(false);
// bean name is the prefix of kafka consumer thread name
listenerContainer.setBeanName("kafka-message-listener");
return listenerContainer;
}
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
return props;
}
static class MyMessageListener implements MessageListener<String, String>
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// do something
}
}
- 没有spring
kafka documentation 很有帮助。下面是它的一个用法示例。
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
我也有同样的需求。我不想使用低级消费者并自己调用轮询。我想使用与@KafkaListener 相同的逻辑,只是动态配置它,特别是根据配置创建多个 Kafka 侦听器。
下面的解决方案是我正在寻找的: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/
这将在后台启动一个消费者线程。
它还会每 10 秒向 test-topic
.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class PeriodicProducerConsumer implements Runnable {
KafkaTemplate<String, Object> kafkaTemplate;
ScheduledExecutorService service;
PeriodicProducerConsumer() {
// Producer Declaration
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(configs);
this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
// Consumer Declaration
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "airbus-service-ka-consumer-group");
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
String topic = "test-topic";
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new AirbusServiceKaMessageListener());
ConcurrentMessageListenerContainer<String, String> container =
new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
container.start();
}
public void start() {
service = Executors.newSingleThreadScheduledExecutor();
service.scheduleWithFixedDelay(this, 5, 10, TimeUnit.SECONDS);
}
public void stop() {
service.shutdown();
}
@Override
public void run() {
String data = String.format("New Airbus Hello at %s", new Date());
kafkaTemplate.send("test-topic", data);
}
}
这里是Consumed消息处理函数:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
public class AirbusServiceKaMessageListener implements MessageListener<String, Object> {
private static final Logger LOG = LoggerFactory.getLogger(AirbusServiceKaMessageListener.class);
@Override
public void onMessage(ConsumerRecord<String, Object> data) {
LOG.info("########################## New Consuming Message From Message Listener ##########################");
LOG.info("Message # {}", data.value());
LOG.info("#################################################################################################");
}
}
Bean.xml
<bean name="purekafkaProduceConsume" class="com.myntra.airbus.purekafka.PeriodicProducerConsumer" scope="singleton"
init-method="start" destroy-method="stop">
</bean>