合流控制中心不拦截流
Confluent Control Center not intercepting stream
我将 CCC 与 Kafka 流一起使用,该流由 Debezium 的 Postgres 连接器填充。
我正在使用以下 docker-compose.yml
:
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper-1
container_name: zookeeper-1
volumes:
- /path/to/something/zk1/zk-data:/var/lib/zookeeper/data
- /path/to/something/zk1/zk-txn-logs:/var/lib/zookeeper/log
ports:
- 22181:22181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 22181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
zookeeper-2:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper-2
container_name: zookeeper-2
volumes:
- /path/to/something/zk2/zk-data:/var/lib/zookeeper/data
- /path/to/something/zk2/zk-txn-logs:/var/lib/zookeeper/log
ports:
- 32181:32181
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
zookeeper-3:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper-3
container_name: zookeeper-3
volumes:
- /path/to/something/zk3/zk-data:/var/lib/zookeeper/data
- /path/to/something/zk3/zk-txn-logs:/var/lib/zookeeper/log
ports:
- 42181:42181
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 42181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
kafka-1:
image: confluentinc/cp-enterprise-kafka:latest
hostname: kafka-1
container_name: kafka-1
volumes:
- /path/to/something/kafka1/kafka-data:/var/lib/kafka/data
ports:
- 19092:19092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
KAFKA_MESSAGE_MAX_BYTES: 3145728
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
kafka-2:
image: confluentinc/cp-enterprise-kafka:latest
hostname: kafka-2
container_name: kafka-2
volumes:
- /path/to/something/kafka2/kafka-data:/var/lib/kafka/data
ports:
- 19093:19093
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
KAFKA_MESSAGE_MAX_BYTES: 3145728
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
kafka-3:
image: confluentinc/cp-enterprise-kafka:latest
hostname: kafka-3
container_name: kafka-3
volumes:
- /path/to/something/kafka3/kafka-data:/var/lib/kafka/data
ports:
- 19094:19094
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19094
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
KAFKA_MESSAGE_MAX_BYTES: 3145728
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- schema-registry
- zookeeper-1
- zookeeper-2
- zookeeper-3
- kafka-1
- kafka-2
- kafka-3
ports:
- "8083:8083"
volumes:
- /path/to/something/postgres-source-connector:/usr/share/java/postgres-source-connector
- /path/to/something/mongodb-sink-connector:/usr/share/java/mongodb-sink-connector
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java'
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 3145728
CONNECT_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- schema-registry
- connect
- ksql-server
- zookeeper-1
- zookeeper-2
- zookeeper-3
- kafka-1
- kafka-2
- kafka-3
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONTROL_CENTER_CONNECT_CLUSTER: 'http://connect:8083'
CONTTROL_CENTER_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONTROL_CENTER_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_CONNECT_CLUSTER: "http://connect:8083"
CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "https://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_CUB_KAFKA_TIMEOUT: 300
PORT: 9021
ksql-server:
image: confluentinc/cp-ksql-server:latest
hostname: ksql-server
container_name: ksql-server
depends_on:
- connect
ports:
- "8088:8088"
environment:
KSQL_CUB_KAFKA_TIMEOUT: 300
KSQL_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_COMMIT_INTERVAL_MS: 2000
KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000
KSQL_KSQL_AUTO_OFFSET_RESET: earliest
ksql-cli:
image: confluentinc/cp-ksql-cli:latest
hostname: ksql-cli
container_name: ksql-cli
depends_on:
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
hostname: rest-proxy
container_name: rest-proxy
depends_on:
- schema-registry
ports:
- 8082:8082
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
postgres:
image: debezium/postgres
hostname: postgres
container_name: postgres
volumes:
- /path/to/something/postgres:/var/lib/postgresql/data
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
POSTGRES_DB: some-db
ports:
- 5432:5432
我已经将 Postgres Connector 映射到 Kafka Connect(通过 Compose 中的 volumes
),并且可以在创建新的 Source Connector 时在 CCC 中看到它。
当我创建源连接器时,我可以看到日志消息表明已创建此连接器的主题。我也在 CCC 的 Connect 区域看到了这个主题。我还可以看到 Connect 能够通过此连接器向 Postgres 进行身份验证。
当我更改我在连接器中指定的 table 时,我看到 Kafka(我有一个 3 集群)确定谁将存储此消息。意思是,Postgres tx 日志创建了一条适当主题的消息以响应我的更改,因此数据库、连接器和 Kafka 工作正常。
但是,无论我做什么,我都无法在 Data Streams
或 System Health
(> Topics
和 > Brokers
areas)(编辑:现在有效。Data Streams 仍然无效)。
我不知道出了什么问题。我得到的唯一指示是初始消息说
Double check to see if monitoring interceptors have been properly configured for any clients producing to or consuming from the cluster controlcenter.cluster
我的印象是,这基本上意味着我的控制中心容器配置了我在上面粘贴的 *_INTERCEPTOR_CLASSES
。我按照此消息中的 link,将您带到他们的文档站点,该站点说要检查提供 kafka 数据的 Web 服务的响应。正如他们的文档所暗示的那样,我得到的响应只是 {}
,表明 Kafka 说它没有数据。但确实如此。
这是在说我还需要以某种方式将这些拦截器配置到连接器中吗?我不知道为任何 consumers/producers 设置监视拦截器意味着什么——我没有任何原始 Java consumers/producers(还)……目前只有源连接器。
我的连接器配置如下(通过 CCC UI 创建)如果重要的话:
{
"database.server.name": "my-namespace",
"database.dbname": "my-database",
"database.hostname": "my-hostname",
"database.port": "5432",
"database.user": "admin",
"schema.whitelist": "public",
"table.whitelist": "my-database.my-table",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"name": "my-connector",
"database.password": "its correct"
}
启动所有服务时,我在相应的日志中看到以下内容,我认为这些内容可能是我感兴趣的(以下无特定顺序):
control-center | 2018-09-17T20:45:02.748463792Z interceptor.classes = []
kafka-2 | 2018-09-17T20:44:56.293701931Z interceptor.classes = []
schema-registry | 2018-09-17T20:45:34.658065846Z interceptor.classes = []
connect | 2018-09-17T20:48:52.628218936Z [2018-09-17 20:48:52,628] WARN The configuration 'producer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
connect | 2018-09-17T20:48:52.628472218Z [2018-09-17 20:48:52,628] WARN The configuration 'consumer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
感谢任何帮助。谢谢!
您正在为拦截器引用 5.1.0
JAR,它在 latest
图像中不存在。如果您 docker-compose exec connect bash
并转到定义的路径,您将看到那里有哪个版本(当前 5.0.0
在 latest
中)。因此,将您的撰写更改为
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
查看 https://github.com/rmoff/ksql/blob/clickstream-c3/ksql-clickstream-demo/docker-compose.yml 的工作示例 Docker Compose with Confluent Control Center 和拦截器与 Kafka Connect(以及 KSQL,如果您有兴趣)一起工作。
要进一步调试,请检查:
Kafka Connect 日志文件 - 如果拦截器正常工作,您应该会看到
[2018-03-02 11:39:38,594] INFO ConsumerConfig values:
[...]
interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor]
[2018-03-02 11:39:38,806] INFO ProducerConfig values:
[...]
interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor]
[2018-03-02 11:39:39,455] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor:74)
[2018-03-02 11:39:39,456] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor:70)
[2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values:
confluent.monitoring.interceptor.publishMs = 15000
confluent.monitoring.interceptor.topic = _confluent-monitoring
(io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
[2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values:
confluent.monitoring.interceptor.publishMs = 15000
confluent.monitoring.interceptor.topic = _confluent-monitoring
(io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
请参阅 Confluent Control Center troubleshooting doc 了解 control-center-console-consumer
的详细信息,您可以使用它来检查正在接收的实际拦截器数据(如果设置不正确,则不会) .
我将 CCC 与 Kafka 流一起使用,该流由 Debezium 的 Postgres 连接器填充。
我正在使用以下 docker-compose.yml
:
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper-1
container_name: zookeeper-1
volumes:
- /path/to/something/zk1/zk-data:/var/lib/zookeeper/data
- /path/to/something/zk1/zk-txn-logs:/var/lib/zookeeper/log
ports:
- 22181:22181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 22181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
zookeeper-2:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper-2
container_name: zookeeper-2
volumes:
- /path/to/something/zk2/zk-data:/var/lib/zookeeper/data
- /path/to/something/zk2/zk-txn-logs:/var/lib/zookeeper/log
ports:
- 32181:32181
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
zookeeper-3:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper-3
container_name: zookeeper-3
volumes:
- /path/to/something/zk3/zk-data:/var/lib/zookeeper/data
- /path/to/something/zk3/zk-txn-logs:/var/lib/zookeeper/log
ports:
- 42181:42181
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 42181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888
kafka-1:
image: confluentinc/cp-enterprise-kafka:latest
hostname: kafka-1
container_name: kafka-1
volumes:
- /path/to/something/kafka1/kafka-data:/var/lib/kafka/data
ports:
- 19092:19092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
KAFKA_MESSAGE_MAX_BYTES: 3145728
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
kafka-2:
image: confluentinc/cp-enterprise-kafka:latest
hostname: kafka-2
container_name: kafka-2
volumes:
- /path/to/something/kafka2/kafka-data:/var/lib/kafka/data
ports:
- 19093:19093
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
KAFKA_MESSAGE_MAX_BYTES: 3145728
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
kafka-3:
image: confluentinc/cp-enterprise-kafka:latest
hostname: kafka-3
container_name: kafka-3
volumes:
- /path/to/something/kafka3/kafka-data:/var/lib/kafka/data
ports:
- 19094:19094
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19094
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
KAFKA_MESSAGE_MAX_BYTES: 3145728
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- schema-registry
- zookeeper-1
- zookeeper-2
- zookeeper-3
- kafka-1
- kafka-2
- kafka-3
ports:
- "8083:8083"
volumes:
- /path/to/something/postgres-source-connector:/usr/share/java/postgres-source-connector
- /path/to/something/mongodb-sink-connector:/usr/share/java/mongodb-sink-connector
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java'
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 3145728
CONNECT_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- schema-registry
- connect
- ksql-server
- zookeeper-1
- zookeeper-2
- zookeeper-3
- kafka-1
- kafka-2
- kafka-3
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
CONTROL_CENTER_CONNECT_CLUSTER: 'http://connect:8083'
CONTTROL_CENTER_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONTROL_CENTER_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_CONNECT_CLUSTER: "http://connect:8083"
CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "https://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_CUB_KAFKA_TIMEOUT: 300
PORT: 9021
ksql-server:
image: confluentinc/cp-ksql-server:latest
hostname: ksql-server
container_name: ksql-server
depends_on:
- connect
ports:
- "8088:8088"
environment:
KSQL_CUB_KAFKA_TIMEOUT: 300
KSQL_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_COMMIT_INTERVAL_MS: 2000
KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000
KSQL_KSQL_AUTO_OFFSET_RESET: earliest
ksql-cli:
image: confluentinc/cp-ksql-cli:latest
hostname: ksql-cli
container_name: ksql-cli
depends_on:
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
hostname: rest-proxy
container_name: rest-proxy
depends_on:
- schema-registry
ports:
- 8082:8082
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
postgres:
image: debezium/postgres
hostname: postgres
container_name: postgres
volumes:
- /path/to/something/postgres:/var/lib/postgresql/data
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
POSTGRES_DB: some-db
ports:
- 5432:5432
我已经将 Postgres Connector 映射到 Kafka Connect(通过 Compose 中的 volumes
),并且可以在创建新的 Source Connector 时在 CCC 中看到它。
当我创建源连接器时,我可以看到日志消息表明已创建此连接器的主题。我也在 CCC 的 Connect 区域看到了这个主题。我还可以看到 Connect 能够通过此连接器向 Postgres 进行身份验证。
当我更改我在连接器中指定的 table 时,我看到 Kafka(我有一个 3 集群)确定谁将存储此消息。意思是,Postgres tx 日志创建了一条适当主题的消息以响应我的更改,因此数据库、连接器和 Kafka 工作正常。
但是,无论我做什么,我都无法在 Data Streams
或 (编辑:现在有效。Data Streams 仍然无效)。System Health
(> Topics
和 > Brokers
areas)
我不知道出了什么问题。我得到的唯一指示是初始消息说
Double check to see if monitoring interceptors have been properly configured for any clients producing to or consuming from the cluster controlcenter.cluster
我的印象是,这基本上意味着我的控制中心容器配置了我在上面粘贴的 *_INTERCEPTOR_CLASSES
。我按照此消息中的 link,将您带到他们的文档站点,该站点说要检查提供 kafka 数据的 Web 服务的响应。正如他们的文档所暗示的那样,我得到的响应只是 {}
,表明 Kafka 说它没有数据。但确实如此。
这是在说我还需要以某种方式将这些拦截器配置到连接器中吗?我不知道为任何 consumers/producers 设置监视拦截器意味着什么——我没有任何原始 Java consumers/producers(还)……目前只有源连接器。
我的连接器配置如下(通过 CCC UI 创建)如果重要的话:
{
"database.server.name": "my-namespace",
"database.dbname": "my-database",
"database.hostname": "my-hostname",
"database.port": "5432",
"database.user": "admin",
"schema.whitelist": "public",
"table.whitelist": "my-database.my-table",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"name": "my-connector",
"database.password": "its correct"
}
启动所有服务时,我在相应的日志中看到以下内容,我认为这些内容可能是我感兴趣的(以下无特定顺序):
control-center | 2018-09-17T20:45:02.748463792Z interceptor.classes = []
kafka-2 | 2018-09-17T20:44:56.293701931Z interceptor.classes = []
schema-registry | 2018-09-17T20:45:34.658065846Z interceptor.classes = []
connect | 2018-09-17T20:48:52.628218936Z [2018-09-17 20:48:52,628] WARN The configuration 'producer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
connect | 2018-09-17T20:48:52.628472218Z [2018-09-17 20:48:52,628] WARN The configuration 'consumer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
感谢任何帮助。谢谢!
您正在为拦截器引用 5.1.0
JAR,它在 latest
图像中不存在。如果您 docker-compose exec connect bash
并转到定义的路径,您将看到那里有哪个版本(当前 5.0.0
在 latest
中)。因此,将您的撰写更改为
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
查看 https://github.com/rmoff/ksql/blob/clickstream-c3/ksql-clickstream-demo/docker-compose.yml 的工作示例 Docker Compose with Confluent Control Center 和拦截器与 Kafka Connect(以及 KSQL,如果您有兴趣)一起工作。
要进一步调试,请检查:
Kafka Connect 日志文件 - 如果拦截器正常工作,您应该会看到
[2018-03-02 11:39:38,594] INFO ConsumerConfig values: [...] interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor] [2018-03-02 11:39:38,806] INFO ProducerConfig values: [...] interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor] [2018-03-02 11:39:39,455] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor:74) [2018-03-02 11:39:39,456] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor:70) [2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values: confluent.monitoring.interceptor.publishMs = 15000 confluent.monitoring.interceptor.topic = _confluent-monitoring (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223) [2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values: confluent.monitoring.interceptor.publishMs = 15000 confluent.monitoring.interceptor.topic = _confluent-monitoring (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
请参阅 Confluent Control Center troubleshooting doc 了解
control-center-console-consumer
的详细信息,您可以使用它来检查正在接收的实际拦截器数据(如果设置不正确,则不会) .