Micronaut 3+ 和 Kafka 集成
Micronaut 3+ and Kafka integration
我目前正在尝试熟悉 micronaut 和 Kafka。我已经按照文档在微服务中创建了一个 KafkaClient,在另一个微服务中创建了一个 KafkaListener。代码如下所示:
客户。这是按预期工作的。我正在兽医评论主题中发布新消息:
@KafkaClient
public interface VetReviewClient {
@Topic("vet-reviews")
void send(VetReviewApi vetReviewApi);
}
听众:
@KafkaListener(groupId = "pet-clinic", offsetReset = OffsetReset.EARLIEST)
@AllArgsConstructor
@Slf4j
public class VetReviewListener {
private VetService vetService;
@Topic("vet-reviews")
public void receive(@MessageBody VetReviewApi vetReviewApi) {
log.info("Received vet review: {}", vetReviewApi);
vetService.processVetReview(vetReviewApi);
}
}
虽然在主题中发布了消息,但监听器并没有拉取新消息。我关注了官方micronaut-kafka integration documentation,但是没有用
我知道这个问题是不久前发布的,但我没有看到任何答案,而且我无法通过评论“恢复”这个问题,因为我没有足够的声誉。
Link to old post
更新问题。我已经更改了 Kafka docker 图像,仅此而已。
当前工作 kafka-zookeper-kafdrop 配置:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafdrop:
image: obsidiandynamics/kafdrop
ports:
- 9100:9000
environment:
- KAFKA_BROKERCONNECT=kafka:29092
- JVM_OPTS=-Xms32M -Xmx64M
depends_on:
- kafka
我目前正在尝试熟悉 micronaut 和 Kafka。我已经按照文档在微服务中创建了一个 KafkaClient,在另一个微服务中创建了一个 KafkaListener。代码如下所示:
客户。这是按预期工作的。我正在兽医评论主题中发布新消息:
@KafkaClient
public interface VetReviewClient {
@Topic("vet-reviews")
void send(VetReviewApi vetReviewApi);
}
听众:
@KafkaListener(groupId = "pet-clinic", offsetReset = OffsetReset.EARLIEST)
@AllArgsConstructor
@Slf4j
public class VetReviewListener {
private VetService vetService;
@Topic("vet-reviews")
public void receive(@MessageBody VetReviewApi vetReviewApi) {
log.info("Received vet review: {}", vetReviewApi);
vetService.processVetReview(vetReviewApi);
}
}
虽然在主题中发布了消息,但监听器并没有拉取新消息。我关注了官方micronaut-kafka integration documentation,但是没有用
我知道这个问题是不久前发布的,但我没有看到任何答案,而且我无法通过评论“恢复”这个问题,因为我没有足够的声誉。
Link to old post
更新问题。我已经更改了 Kafka docker 图像,仅此而已。 当前工作 kafka-zookeper-kafdrop 配置:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafdrop:
image: obsidiandynamics/kafdrop
ports:
- 9100:9000
environment:
- KAFKA_BROKERCONNECT=kafka:29092
- JVM_OPTS=-Xms32M -Xmx64M
depends_on:
- kafka