Spring Kafka 流处理器不工作
Spring Kafka stream processor not working
我正在尝试使用 Spring 引导编写 Kafka 流处理器,但是当消息生成到主题中时它没有被调用。
我有以下生产者可以很好地处理主题名称 adt.events.location
。
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "adt.events.location";
private final KafkaTemplate<String, Object> kafkaTemplate;
public Producer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@EventListener(ApplicationStartedEvent.class)
public void produce() {
this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedIn(1L, 1001L, 104L, 11L, 6L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientBedChanged(1L, 1001L, 7L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientRoomChanged(1L, 1001L, 10L));
this.kafkaTemplate.send(TOPIC, "2", new EventPatientCheckedIn(2L, 1002L, 110L, 18L, 2L));
this.kafkaTemplate.send(TOPIC, "3", new EventPatientCheckedIn(3L, 1003L, 111L, 16L, 1L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedOut(1L, 1001L));
this.kafkaTemplate.send(TOPIC, "3", new EventPatientBedChanged(3L, 1003L, 3L));
}
}
主题消息有不同的类型,并且采用 Avro 格式。模式在模式注册表中注册 Avro union.
这些是主题
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topicEventsLocation() {
return TopicBuilder.name("adt.events.location").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicPatientLocation() {
return TopicBuilder.name("adt.patient.location").partitions(1).replicas(1).build();
}
}
application.yml 我正在使用 cp-all-in-one-community 作为 docker-file
server:
port: 9000
spring:
kafka:
properties:
auto:
register:
schemas: false
use:
latest:
version: true
schema:
registry:
url: http://localhost:8081
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
streams:
application-id: kafka-demo
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
Spring 启动应用程序
@SpringBootApplication
@EnableKafkaStreams
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
和处理器
@Component
public class Processor {
final StreamsBuilder builder = new StreamsBuilder();
@Autowired
public void process() {
...
}
}
预期的行为是在每次生成消息时看到模式名称打印出来,并且输出主题 adt.patient.location
被流处理输出填充,但没有任何反应。
我是 Kafka 的新手,所以我可能遗漏了一些东西。
更新
我实际上错过了 @EnableKafkaStreams
注释。
但现在我得到以下错误:
2021-04-07 16:02:16.967 ERROR 120225 --- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [LocationService-9611eedf-df9b-4fe5-9a7d-058027cee22a] Topology with no input topics will create no stream threads and no global thread.
2021-04-07 16:02:16.967 WARN 120225 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
在 KafkaTemplate
上使用 @Autowired
。我认为这就是你所缺少的东西。我给出的例子没有使用 AvroSerializer。所以我假设你的序列化器正在工作。至少您应该看到消息到达消费者或序列化错误。此外,您可以改进处理回调的方法并使用更一致的消息记录。例如,使用 ProducerRecord
创建您要发送的消息。使用 ListenableFuture
.
添加回调
@Slf4j
@Service
public class Producer {
@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
public void produce() {
String key = "1";
Object value = EventPatientCheckedIn....
ProducerRecord<String, Object> producerRecord = buildProducerRecord(TOPIC, key, value);
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) { handleFailure(key, value, ex); }
@Override
public void onSuccess(SendResult<String, Object> result) { handleSuccess(key, value, result); }
});
}
private ProducerRecord<String, Object> buildProducerRecord(String topic, String key, Object value) {
List<Header> recordHeaders = List.of(new RecordHeader("event-source", "scanner".getBytes()));
return new ProducerRecord<String, Object>(topic, null, key, value, recordHeaders);
}
private void handleSuccess(String key, Object value, SendResult<String, Object> result) {
log.info("message send successfully for the key: {} and value: {} at partition: {}", key, value, result.getRecordMetadata().partition());
}
private void handleFailure(String key, Object value, Throwable ex) {
log.error("error sending the message and the exception us {}", ex.getMessage());
try { throw ex; }
catch (Throwable throwable) {
log.error("error on failure: {}", throwable.getMessage());
}
}
}
更新: 我认为您缺少配置 Properties
然后在 Processor
上创建 streams.start();
。我将此示例基于 this reference.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "adt.events.location");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, YOUR_AVRO_SERDE_HERE);
...
...
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
我发现了两个错误:
- 添加注释
@EnableKafkaStreams
- 处理器在
StreamsBuilder
注入时出错
@SpringBootApplication
@EnableKafkaStreams
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
@Autowired
public void process(final StreamsBuilder builder) {
logger.info("Processing location events");
...
}
我正在尝试使用 Spring 引导编写 Kafka 流处理器,但是当消息生成到主题中时它没有被调用。
我有以下生产者可以很好地处理主题名称 adt.events.location
。
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "adt.events.location";
private final KafkaTemplate<String, Object> kafkaTemplate;
public Producer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@EventListener(ApplicationStartedEvent.class)
public void produce() {
this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedIn(1L, 1001L, 104L, 11L, 6L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientBedChanged(1L, 1001L, 7L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientRoomChanged(1L, 1001L, 10L));
this.kafkaTemplate.send(TOPIC, "2", new EventPatientCheckedIn(2L, 1002L, 110L, 18L, 2L));
this.kafkaTemplate.send(TOPIC, "3", new EventPatientCheckedIn(3L, 1003L, 111L, 16L, 1L));
this.kafkaTemplate.send(TOPIC, "1", new EventPatientCheckedOut(1L, 1001L));
this.kafkaTemplate.send(TOPIC, "3", new EventPatientBedChanged(3L, 1003L, 3L));
}
}
主题消息有不同的类型,并且采用 Avro 格式。模式在模式注册表中注册 Avro union.
这些是主题
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topicEventsLocation() {
return TopicBuilder.name("adt.events.location").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicPatientLocation() {
return TopicBuilder.name("adt.patient.location").partitions(1).replicas(1).build();
}
}
application.yml 我正在使用 cp-all-in-one-community 作为 docker-file
server:
port: 9000
spring:
kafka:
properties:
auto:
register:
schemas: false
use:
latest:
version: true
schema:
registry:
url: http://localhost:8081
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
streams:
application-id: kafka-demo
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
Spring 启动应用程序
@SpringBootApplication
@EnableKafkaStreams
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
和处理器
@Component
public class Processor {
final StreamsBuilder builder = new StreamsBuilder();
@Autowired
public void process() {
...
}
}
预期的行为是在每次生成消息时看到模式名称打印出来,并且输出主题 adt.patient.location
被流处理输出填充,但没有任何反应。
我是 Kafka 的新手,所以我可能遗漏了一些东西。
更新
我实际上错过了 @EnableKafkaStreams
注释。
但现在我得到以下错误:
2021-04-07 16:02:16.967 ERROR 120225 --- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [LocationService-9611eedf-df9b-4fe5-9a7d-058027cee22a] Topology with no input topics will create no stream threads and no global thread.
2021-04-07 16:02:16.967 WARN 120225 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
在 KafkaTemplate
上使用 @Autowired
。我认为这就是你所缺少的东西。我给出的例子没有使用 AvroSerializer。所以我假设你的序列化器正在工作。至少您应该看到消息到达消费者或序列化错误。此外,您可以改进处理回调的方法并使用更一致的消息记录。例如,使用 ProducerRecord
创建您要发送的消息。使用 ListenableFuture
.
@Slf4j
@Service
public class Producer {
@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
public void produce() {
String key = "1";
Object value = EventPatientCheckedIn....
ProducerRecord<String, Object> producerRecord = buildProducerRecord(TOPIC, key, value);
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) { handleFailure(key, value, ex); }
@Override
public void onSuccess(SendResult<String, Object> result) { handleSuccess(key, value, result); }
});
}
private ProducerRecord<String, Object> buildProducerRecord(String topic, String key, Object value) {
List<Header> recordHeaders = List.of(new RecordHeader("event-source", "scanner".getBytes()));
return new ProducerRecord<String, Object>(topic, null, key, value, recordHeaders);
}
private void handleSuccess(String key, Object value, SendResult<String, Object> result) {
log.info("message send successfully for the key: {} and value: {} at partition: {}", key, value, result.getRecordMetadata().partition());
}
private void handleFailure(String key, Object value, Throwable ex) {
log.error("error sending the message and the exception us {}", ex.getMessage());
try { throw ex; }
catch (Throwable throwable) {
log.error("error on failure: {}", throwable.getMessage());
}
}
}
更新: 我认为您缺少配置 Properties
然后在 Processor
上创建 streams.start();
。我将此示例基于 this reference.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "adt.events.location");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, YOUR_AVRO_SERDE_HERE);
...
...
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
我发现了两个错误:
- 添加注释
@EnableKafkaStreams
- 处理器在
StreamsBuilder
注入时出错
@SpringBootApplication
@EnableKafkaStreams
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
@Autowired
public void process(final StreamsBuilder builder) {
logger.info("Processing location events");
...
}