为什么我只能看到这个 Kafka 示例中的所有其他消息?
Why can I see only every other message in this Kafka example?
我正在尝试修改其中一个 spring cloud stream samples,我得到的结果令人困惑 - 尽管我只为我的频道注册了一个流侦听器,但我每隔一秒就会收到一条消息。我怀疑这是由单个 kafka 分区的默认负载平衡引起的,但我不知道如何确认这一点。
docker ps
仅显示一个正在启动的 kafka 代理实例
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e058697a3bb2 wurstmeister/kafka "start-kafka.sh" 5 minutes ago Up 5 minutes 0.0.0.0:9092->9092/tcp kafka-uppercase-tx
d001389ddfa4 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 5 minutes ago Up 5 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp uppercasetransformer_zookeeper_1
检查 kafka 控制台消费者也会生成单一类型的响应,这次它只是 BAR
虽然:
/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR
检查消费者组描述和成员并没有显示任何额外的消费者,所以我的负载平衡理论在这里失败了:
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
input 0 0 0 0 consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2 1
我也看不出主题描述有什么问题:
/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output PartitionCount:1 ReplicationFactor:1 Configs:
Topic: output Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
为什么我的 output
频道只有每秒发送一条消息,我如何自行检查?
kafka-demo.java:
package demo;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
@EnableBinding(Processor.class)
public class UppercaseTransformer {
private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String payload) {
logger.info("transforming payload {}", payload);
return payload.toUpperCase();
}
static class TestSource {
private AtomicLong longSemaphore = new AtomicLong(0L);
@Bean
@InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () ->
{
final long semaphoreValue = longSemaphore.getAndIncrement();
final boolean condition = semaphoreValue % 2 == 0;
final String foobar = condition ? "foo" : "bar";
logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
return new GenericMessage<>(foobar);
};
}
@StreamListener(Processor.OUTPUT)
public void receive(String payload) {
logger.info("Data received: {}", payload);
}
}
}
日志:
2019-08-05 22:48:02.971 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:03.973 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:04.976 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:06.980 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:06.982 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:07.982 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
申请-local.yml:
spring:
cloud:
stream:
# bindings:
# output:
# destination: xformed
# test-sink:
# destination: xformed
# input:
# destination: testtock
# test-source:
# destination: testtock
default-binder: kafka
您在 output
频道上有两个消费者 - 主题绑定和您的 receive()
服务激活器。
默认循环处理将消息交替发送到您的服务激活器和主题。
我正在尝试修改其中一个 spring cloud stream samples,我得到的结果令人困惑 - 尽管我只为我的频道注册了一个流侦听器,但我每隔一秒就会收到一条消息。我怀疑这是由单个 kafka 分区的默认负载平衡引起的,但我不知道如何确认这一点。
docker ps
仅显示一个正在启动的 kafka 代理实例
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e058697a3bb2 wurstmeister/kafka "start-kafka.sh" 5 minutes ago Up 5 minutes 0.0.0.0:9092->9092/tcp kafka-uppercase-tx
d001389ddfa4 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 5 minutes ago Up 5 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp uppercasetransformer_zookeeper_1
检查 kafka 控制台消费者也会生成单一类型的响应,这次它只是 BAR
虽然:
/opt/kafka_2.12-2.1.0/bin # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
BAR
BAR
BAR
BAR
BAR
检查消费者组描述和成员并没有显示任何额外的消费者,所以我的负载平衡理论在这里失败了:
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
input 0 0 0 0 consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2
/opt/kafka_2.12-2.1.0/bin # ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group anonymous.0559a5b9-7876-4523-8e6c-74eb03d305a3 --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-2-955118a4-3597-48a3-8fdd-3823fc366068 /172.22.0.1 consumer-2 1
我也看不出主题描述有什么问题:
/opt/kafka_2.12-2.1.0/bin # ./kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic output
Topic:output PartitionCount:1 ReplicationFactor:1 Configs:
Topic: output Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
为什么我的 output
频道只有每秒发送一条消息,我如何自行检查?
kafka-demo.java:
package demo;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
@EnableBinding(Processor.class)
public class UppercaseTransformer {
private static Logger logger = LoggerFactory.getLogger(UppercaseTransformer.class);
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String payload) {
logger.info("transforming payload {}", payload);
return payload.toUpperCase();
}
static class TestSource {
private AtomicLong longSemaphore = new AtomicLong(0L);
@Bean
@InboundChannelAdapter(channel = "input", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () ->
{
final long semaphoreValue = longSemaphore.getAndIncrement();
final boolean condition = semaphoreValue % 2 == 0;
final String foobar = condition ? "foo" : "bar";
logger.info("semaphoreValue: {}, condition: {}, foobar: {}", semaphoreValue, condition, foobar);
return new GenericMessage<>(foobar);
};
}
@StreamListener(Processor.OUTPUT)
public void receive(String payload) {
logger.info("Data received: {}", payload);
}
}
}
日志:
2019-08-05 22:48:02.971 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 2, condition: true, foobar: foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:02.972 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:03.973 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 3, condition: false, foobar: bar
2019-08-05 22:48:03.974 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:04.976 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : semaphoreValue: 4, condition: true, foobar: foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:04.977 INFO 28843 --- [ask-scheduler-3] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : semaphoreValue: 5, condition: false, foobar: bar
2019-08-05 22:48:05.978 INFO 28843 --- [ask-scheduler-2] demo.UppercaseTransformer : transforming payload bar
2019-08-05 22:48:06.980 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : semaphoreValue: 6, condition: true, foobar: foo
2019-08-05 22:48:06.981 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : transforming payload foo
2019-08-05 22:48:06.982 INFO 28843 --- [ask-scheduler-4] demo.UppercaseTransformer : Data received: FOO
2019-08-05 22:48:07.982 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : semaphoreValue: 7, condition: false, foobar: bar
2019-08-05 22:48:07.983 INFO 28843 --- [ask-scheduler-1] demo.UppercaseTransformer : transforming payload bar
申请-local.yml:
spring:
cloud:
stream:
# bindings:
# output:
# destination: xformed
# test-sink:
# destination: xformed
# input:
# destination: testtock
# test-source:
# destination: testtock
default-binder: kafka
您在 output
频道上有两个消费者 - 主题绑定和您的 receive()
服务激活器。
默认循环处理将消息交替发送到您的服务激活器和主题。