How to access Kafka bridge
我正在使用 strimzi,我想学习如何使用 Kafka 桥并了解它的工作原理我使用以下 yml 文件创建了一个 Kafka 集群
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
name: my-cluster
version: 2.6.0
replicas: 3
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "2.6"
type: jbod
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
replicas: 3
type: persistent-claim
size: 100Gi
deleteClaim: false
topicOperator: {}
userOperator: {}
这里是 Kafka 网桥的 yml 语法
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaBridge
name: my-bridge
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
port: 8080
ist@ist-1207:~$ kubectl get svc -A
default kubernetes ClusterIP <none> 443/TCP 3d1h
kube-system kube-dns ClusterIP <none> 53/UDP,53/TCP,9153/TCP 3d1h
strimzi my-bridge-bridge-service ClusterIP <none> 8080/TCP 88m
strimzi my-cluster-kafka-bootstrap ClusterIP <none> 9091/TCP,9092/TCP,9093/TCP 109m
strimzi my-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 109m
strimzi my-cluster-zookeeper-client ClusterIP <none> 2181/TCP 110m
strimzi my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 110m
strimzi my-connect-cluster-connect-api ClusterIP <none> 8083/TCP 25m
现在我使用以下命令执行它的 pod:
$ kubectl exec -it my-bridge-bridge-684df9fc64-bktc4 -n strimzi bash
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST http://localhost:8080/consumers/my-group -H 'content-type: application/vnd.kafka.v2+json' -d '{
"name": "your-consumer",
"format": "json",
"auto.offset.reset": "earliest",
"enable.auto.commit": false
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST http://localhost:8080/consumers/my-group/instances/your-consumer/subscription -H 'content-type: application/vnd.kafka.v2+json' -d '{
"topics": [
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
> -H 'accept: application/vnd.kafka.json.v2+json'
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST \
> http://localhost:8080/topics/your-topic \
> -H 'content-type: application/vnd.kafka.json.v2+json' \
> -d '{
> "records": [
> {
> "key": "key-1",
> "value": "kajal verma"
> },
> {
> "key": "key-2",
> "value": "Aman verma"
> }
> ]
> }'
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
> -H 'accept: application/vnd.kafka.json.v2+json'
这些是 pod 桥的日志
ist@ist-1207:~$ kubectl logs -f my-bridge-bridge-684df9fc64-lstx2 -n strimzi
Kafka Bridge configuration:
#Bridge configuration
#Kafka common properties
#Apache Kafka Producer
#Apache Kafka Consumer
#HTTP configuration
[2020-11-24 11:42:01,374] INFO <Application :64> [main ] Strimzi Kafka Bridge 0.19.0 is starting
[2020-11-24 11:42:02,939] WARN <onMetaSchema:337> [oop-thread-1] Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[2020-11-24 11:42:03,402] INFO <HttpBridge :180> [oop-thread-1] Starting HTTP-Kafka bridge verticle...
[2020-11-24 11:42:03,459] INFO <ClientConfig:354> [oop-thread-1] AdminClientConfig values:
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
[2020-11-24 11:42:04,336] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:42:04,336] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:42:04,338] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218124265
Nov 24, 2020 11:42:05 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2656 ms, time limit is 2000 ms
Nov 24, 2020 11:42:06 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 3869 ms, time limit is 2000 ms
[2020-11-24 11:42:07,589] INFO <HttpBridge :102> [oop-thread-1] HTTP-Kafka Bridge started and listening on port 8080
[2020-11-24 11:42:07,604] INFO <HttpBridge :103> [oop-thread-1] HTTP-Kafka Bridge bootstrap servers my-cluster-kafka-bootstrap:9092
[2020-11-24 11:42:07,609] INFO <Application :219> [oop-thread-0] HTTP verticle instance deployed [6bc2ded6-162c-4444-b8ac-f51c0573e389]
[2020-11-24 11:44:15,295] WARN <etworkClient:757> [dminclient-1] [AdminClient clientId=adminclient-1] Connection to node -1 (my-cluster-kafka-bootstrap/ could not be established. Broker may not be available.
[2020-11-24 11:44:15,301] INFO <adataManager:235> [dminclient-1] [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1606218154377, tries=1, nextAllowedTryMs=1606218255400) timed out at 1606218255300 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2020-11-24 11:46:09,397] INFO <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Request: from, method = POST, path = /consumers/my-group
[2020-11-24 11:46:09,508] INFO <nsumerConfig:354> [oop-thread-1] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = your-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
[2020-11-24 11:46:09,759] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:46:09,759] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:46:09,760] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218369758
[2020-11-24 11:46:09,775] INFO <idgeEndpoint:140> [oop-thread-1] Created consumer your-consumer in group my-group
[2020-11-24 11:46:09,781] INFO <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Response: statusCode = 200, message = OK
[2020-11-24 11:46:33,188] INFO <subscribe :85> [oop-thread-1] [343058276] SUBSCRIBE Request: from, method = POST, path = /consumers/my-group/instances/your-consumer/subscription
[2020-11-24 11:46:33,199] INFO <idgeEndpoint:191> [oop-thread-1] Subscribe to topics [SinkTopicSubscription(topic=your-topic,partition=null,offset=null)]
[2020-11-24 11:46:33,224] INFO <subscribe :85> [oop-thread-1] [343058276] SUBSCRIBE Response: statusCode = 200, message = OK
[2020-11-24 11:46:33,231] INFO <afkaConsumer:965> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Subscribed to topic(s): your-topic
[2020-11-24 11:46:58,713] INFO <poll :85> [oop-thread-1] [690326198] POLL Request: from, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:46:58,756] INFO <poll :85> [oop-thread-1] [690326198] POLL Response: statusCode = 200, message = OK
[2020-11-24 11:47:25,358] INFO <send :85> [oop-thread-1] [859539310] SEND Request: from, method = POST, path = /topics/your-topic
[2020-11-24 11:47:25,474] INFO <oducerConfig:354> [oop-thread-1] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[2020-11-24 11:47:25,611] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,631] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,631] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445608
[2020-11-24 11:47:25,649] INFO <oducerConfig:354> [oop-thread-1] ProducerConfig values:
acks = 0
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-2
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[2020-11-24 11:47:25,670] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,671] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,672] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445665
[2020-11-24 11:47:25,673] INFO <Metadata :279> [| producer-1] [Producer clientId=producer-1] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,685] INFO <Metadata :279> [| producer-2] [Producer clientId=producer-2] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,714] INFO <send :85> [oop-thread-1] [859539310] SEND Response: statusCode = 200, message = OK
[2020-11-24 11:47:25,938] INFO <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:25,944] INFO <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:41,679] INFO <poll :85> [oop-thread-1] [484189988] POLL Request: from, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:47:41,682] WARN <etworkClient:1073> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Error while fetching metadata with correlation id 2 : {your-topic=LEADER_NOT_AVAILABLE}
[2020-11-24 11:47:41,684] INFO <Metadata :279> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:41,688] INFO <poll :85> [oop-thread-1] [484189988] POLL Response: statusCode = 200, message = OK
[2020-11-24 11:53:01,882] INFO <poll :85> [oop-thread-1] [1610292326] POLL Request: from, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:53:01,883] INFO <poll :85> [oop-thread-1] [1610292326] POLL Response: statusCode = 200, message = OK
[2020-11-24 11:53:01,946] INFO <tCoordinator:815> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Discovered group coordinator my-cluster-kafka-2.my-cluster-kafka-brokers.strimzi.svc:9092 (id: 2147483645 rack: null)
[2020-11-24 11:53:01,979] INFO <tCoordinator:553> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] (Re-)joining group
不能完全确定 Strimzi 版本,但对我来说它只是看起来脚本不在 $PATH 中或者它的名称有点不同。例如,这是来自我的一个集群(尽管我使用 Confluent 的 kafka helm chart):
root@confluent-kafka-connect-6bf9c944f-fzbgc:/# which kafka-topics
root@confluent-kafka-connect-6bf9c944f-fzbgc:/# kafka-topics --help
This tool helps to create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
Kafka Bridge 服务是一个 HTTP 服务,没有 Kafka CLI 工具(如果有,端口 8080 将不会响应该命令)。
如果您想列出主题,请使用 GET /topics 请求
您正在尝试使用不包含 Kafka 二进制文件的 bridge pod 中的 kafka-topics
工具,因此其中没有 kafka-topics
您尝试执行的命令可以在您部署的其中一个 Kafka pods 上运行,该工具将可用。
该桥仅在端口 8080 上为 Kafka 提供一个 HTTP 接口,因此您可以在 Kubernetes 集群中的一个 pod 中对桥端点执行 GET/POST 请求,以通过 HTTP 与 Kafka 进行交互。
如果你想在 Kubernetes 集群之外公开网桥,那么你可以在这里阅读更多相关信息:https://strimzi.io/blog/2019/11/05/exposing-http-bridge/
