为什么我只能看到这个 Kafka 示例中的所有其他消息?

Why can I see only every other message in this Kafka example?

我正在尝试修改其中一个 spring cloud stream samples,我得到的结果令人困惑 - 尽管我只为我的频道注册了一个流侦听器,但我每隔一秒就会收到一条消息。我怀疑这是由单个 kafka 分区的默认负载平衡引起的,但我不知道如何确认这一点。

docker ps 仅显示一个正在启动的 kafka 代理实例

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
e058697a3bb2        wurstmeister/kafka       "start-kafka.sh"         5 minutes ago       Up 5 minutes        0.0.0.0:9092->9092/tcp                               kafka-uppercase-tx
d001389ddfa4        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   5 minutes ago       Up 5 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   uppercasetransformer_zookeeper_1

检查 kafka 控制台消费者也会生成单一类型的响应,这次它只是 BAR 虽然:

/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR

检查消费者组描述和成员并没有显示任何额外的消费者,所以我的负载平衡理论在这里失败了:

/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
input           0          0               0               0               consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1     consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members

CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1     consumer-2      1

我也看不出主题描述有什么问题:

/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: output   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

为什么我的 output 频道只有每秒发送一条消息,我如何自行检查?

kafka-demo.java:

package demo;

import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

@EnableBinding(Processor.class)
public class UppercaseTransformer {

    private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);

    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String transform(String payload) {
        logger.info("transforming payload {}", payload);
        return payload.toUpperCase();
    }

    static class TestSource {
        private AtomicLong longSemaphore = new AtomicLong(0L);

        @Bean
        @InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
        public MessageSource<String> sendTestData() {
            return () ->
            {
                final long semaphoreValue = longSemaphore.getAndIncrement();
                final boolean condition = semaphoreValue % 2 == 0;
                final String foobar = condition ? "foo" : "bar";
                logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
                return new GenericMessage<>(foobar);
            };

        }

        @StreamListener(Processor.OUTPUT)
        public void receive(String payload) {
            logger.info("Data received: {}", payload);
        }
    }
}

日志:

2019-08-05 22:48:02.971  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : transforming payload foo
2019-08-05 22:48:02.972  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : Data received: FOO
2019-08-05 22:48:03.973  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : transforming payload bar
2019-08-05 22:48:04.976  INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer                : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977  INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer                : transforming payload foo
2019-08-05 22:48:04.977  INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer                : Data received: FOO
2019-08-05 22:48:05.978  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978  INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer                : transforming payload bar
2019-08-05 22:48:06.980  INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer                : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981  INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer                : transforming payload foo
2019-08-05 22:48:06.982  INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer                : Data received: FOO
2019-08-05 22:48:07.982  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983  INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer                : transforming payload bar

申请-local.yml:

spring:
  cloud:
    stream:
#      bindings:
#        output:
#          destination: xformed
#        test-sink:
#          destination: xformed
#        input:
#          destination: testtock
#        test-source:
#          destination: testtock
      default-binder: kafka

您在 output 频道上有两个消费者 - 主题绑定和您的 receive() 服务激活器。

默认循环处理将消息交替发送到您的服务激活器和主题。