为什么我的 Kafka S3 连接器接收器在创建后删除我的主题(Kafka 连接器重新启动)?
Why does my Kafka S3 Connector Sink delete my topic after creation (Kafka Connector Restarts)?
我正在使用 Confluent 的 Kafka 连接器将数据从 Kafka 接收到 MinIO 存储桶。我在 Kubernetes 环境中使用 io.confluent.connect.s3.S3SinkConnector
。这是我当前的 S3 接收器配置:
kafkaConnectSpec:
class: io.confluent.connect.s3.S3SinkConnector
config:
tasks.max: 1
topics: topic-name
s3.bucket.name: bucket
flush.size: 1
store.url: http://minio:9000
format.class: io.confluent.connect.s3.format.json.JsonFormat
storage.class: io.confluent.connect.s3.storage.S3Storage
云环境部署后,客户希望能够动态控制主题(即随意增删主题)。虽然我明白为什么这可能不理想,但我向上级屈服。
因此,为了执行主题加法,我正在使用 Kafka REST API:
def update_sink(topic, connector):
configuration = requests.get("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector)).json()
if "config" not in configuration:
return {
"status": 500,
"message": "Kafka Sink " + str(connector) + " does not have a configuration"
}
# Topics must be comma delimited
if "topics" in configuration["config"]:
configuration["config"]["topics"] += ',' + topic
requests.put("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config", json=configuration["config"])
print("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config")
print(configuration["config"])
return {
"status": 200,
"message": "Kafka Sink " + str(connector) + " successfully updated"
}
我知道代码并不漂亮,但它现在可以完成工作。它实质上向 /connectors/my-sink/config
端点发出 PUT 请求,并附加了我的新主题。
这行得通。我的sink有新话题了,可以发消息了
但是,在 3-5 分钟内,我的 Kafka Sink Pod 开始重新启动(我认为)Kafka 连接器:
2021-03-19 23:02:55,086 INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector minio-s3-sink config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [KafkaBasedLog Work Thread - connect-cluster-configs]
2021-03-19 23:02:55,589 INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by restarting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Stopping connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Shutting down S3 connector minio-s3-sink (io.confluent.connect.s3.S3SinkConnector) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO Stopped connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
... # Performs the restart here
此时 pod 失去了话题。
我相信这是由 config.action.reload = restart
配置值引起的。我认为在收到新配置后,连接器将在 N 分钟后重新启动。但是,我似乎找不到任何有关如何更改该行为的文档。也许我应该在我的 PUT 请求期间这样做,但这感觉很老套。也是瞎编的。
有谁知道为什么我的连接器在更新配置的 PUT 请求后重新启动?有什么办法可以防止这种情况吗?
编辑 #1:我尝试添加 config.action.reload = none
,但连接器仍然重新启动。
我查看了 Kafka Operator 中的日志,它没有触发重置。似乎与 Kafka Connector Operator 完全隔离。
问题已记录在 Strimzi 中:
If KafkaConnectors are enabled, manual changes made directly using the Kafka Connect REST API are reverted by the Cluster Operator
https://strimzi.io/docs/operators/latest/deploying.html#availability_of_the_kafka_connect_rest_api
我不知道我的部署中会发生这种情况,但显然我们必须将其关闭。不幸的是,K8 连接器部署非常适合简单的开始。
这是将它们“关闭”的相关配置:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
annotations:
# # use-connector-resources configures this KafkaConnect
# # to use KafkaConnector resources to avoid
# # needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "false"
strimzi.io/use-connector-resources: "false"
会成功,因此您无法通过 YAML 文件添加连接器,但您可以通过 REST API 添加连接器(只要 pod 运行,这些更改就会持续存在)
我正在使用 Confluent 的 Kafka 连接器将数据从 Kafka 接收到 MinIO 存储桶。我在 Kubernetes 环境中使用 io.confluent.connect.s3.S3SinkConnector
。这是我当前的 S3 接收器配置:
kafkaConnectSpec:
class: io.confluent.connect.s3.S3SinkConnector
config:
tasks.max: 1
topics: topic-name
s3.bucket.name: bucket
flush.size: 1
store.url: http://minio:9000
format.class: io.confluent.connect.s3.format.json.JsonFormat
storage.class: io.confluent.connect.s3.storage.S3Storage
云环境部署后,客户希望能够动态控制主题(即随意增删主题)。虽然我明白为什么这可能不理想,但我向上级屈服。
因此,为了执行主题加法,我正在使用 Kafka REST API:
def update_sink(topic, connector):
configuration = requests.get("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector)).json()
if "config" not in configuration:
return {
"status": 500,
"message": "Kafka Sink " + str(connector) + " does not have a configuration"
}
# Topics must be comma delimited
if "topics" in configuration["config"]:
configuration["config"]["topics"] += ',' + topic
requests.put("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config", json=configuration["config"])
print("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config")
print(configuration["config"])
return {
"status": 200,
"message": "Kafka Sink " + str(connector) + " successfully updated"
}
我知道代码并不漂亮,但它现在可以完成工作。它实质上向 /connectors/my-sink/config
端点发出 PUT 请求,并附加了我的新主题。
这行得通。我的sink有新话题了,可以发消息了
但是,在 3-5 分钟内,我的 Kafka Sink Pod 开始重新启动(我认为)Kafka 连接器:
2021-03-19 23:02:55,086 INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector minio-s3-sink config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [KafkaBasedLog Work Thread - connect-cluster-configs]
2021-03-19 23:02:55,589 INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by restarting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Stopping connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Shutting down S3 connector minio-s3-sink (io.confluent.connect.s3.S3SinkConnector) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO Stopped connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
... # Performs the restart here
此时 pod 失去了话题。
我相信这是由 config.action.reload = restart
配置值引起的。我认为在收到新配置后,连接器将在 N 分钟后重新启动。但是,我似乎找不到任何有关如何更改该行为的文档。也许我应该在我的 PUT 请求期间这样做,但这感觉很老套。也是瞎编的。
有谁知道为什么我的连接器在更新配置的 PUT 请求后重新启动?有什么办法可以防止这种情况吗?
编辑 #1:我尝试添加 config.action.reload = none
,但连接器仍然重新启动。
我查看了 Kafka Operator 中的日志,它没有触发重置。似乎与 Kafka Connector Operator 完全隔离。
问题已记录在 Strimzi 中:
If KafkaConnectors are enabled, manual changes made directly using the Kafka Connect REST API are reverted by the Cluster Operator
https://strimzi.io/docs/operators/latest/deploying.html#availability_of_the_kafka_connect_rest_api
我不知道我的部署中会发生这种情况,但显然我们必须将其关闭。不幸的是,K8 连接器部署非常适合简单的开始。
这是将它们“关闭”的相关配置:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
annotations:
# # use-connector-resources configures this KafkaConnect
# # to use KafkaConnector resources to avoid
# # needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "false"
strimzi.io/use-connector-resources: "false"
会成功,因此您无法通过 YAML 文件添加连接器,但您可以通过 REST API 添加连接器(只要 pod 运行,这些更改就会持续存在)